余白

https://blog.lacolaco.net/ に移転しました

RxJSによるWeb Workerの抽象化 2つのアプローチ

この記事では、RxJS を使ったWeb Workerの抽象化を試みます。 なお、記事中で Web WorkerあるいはWorkerと言ったときに指すのは new Worker() で作成する Dedicated Workerのみで、Shared WorkerやService Workerなどは対象外です。

なぜWeb Worker?

Web Workerを使うのに2つの目的があります。ひとつはoff-the-main-threadとよく言われる、UIメインスレッドとは別のWorkerスレッドで並行処理をおこなうことによるパフォーマンス改善です。 そしてもうひとつは、仕様がドラフト段階にある ES ModulesのWorker対応 を利用した Module Worker によるコード分割です。

https://html.spec.whatwg.org/multipage/workers.html#module-worker-example

ES ModulesのWorker対応は、現在Chromiumではフラグ付きでサポートされています。

www.chromestatus.com

Module Workerでは次のようなコードで type: 'module' を指定すると、コンストラクタに指定したパスをES Moduleとして読み込めます。 さらにWorkerスクリプト内でもES Moduleのコンテキストで他のモジュールをimport/export文を使えるようになります。

const worker = new Worker('./worker.mjs', { type: 'module' });

もちろんChromeですらまだ普通には使えない機能なので、今Module Workerを使うためには小細工が必要です。 webpackを使っている場合は、GoogleChromeチームが開発している WorkerPlugin を使うのが便利です。

github.com

WorkerPluginは type: 'module' オプションでModule Workerを作成しているコードを発見すると、 呼び出されているファイルをwebpackのCode Splitting機能で別バンドルに分割しながら、type: 'module' オプションを除去してくれます。

webpack.js.org

つまり、このプラグインさえ入れておけば、ES Moduleベースで書かれたファイルをModule Workerとして呼び出し、webpackのビルド後にはWorkerごとにバンドルが自動で分割されている、という状態になります。 多くの場合、Workerで実行したい処理というのはページの初期化時に必要なものではないでしょう。 たいていはユーザーインタラクションや何かのイベントを受けて実行される非同期的なジョブです。 そのような処理は遅延読み込みとWorkerの両方と相性がよいので、Module Workerはページの初期読み込みに必要なバンドルサイズを少なくしながらメインスレッド の負荷も下げられるまさに一石二鳥です。

RxJSによる抽象化

WorkerはpostMessage/ommessageによって他のスレッドとコミュニケーションします。 このイベント駆動の仕組みは、RxJSのSubjectモデルとよく似ています。 Workerそのものでは拡張性に乏しいですが、Subjectで抽象化することでRxJSのオペレーターを使ったデータ加工や、RxJSと連携できる他のJavaScriptライブラリなどとのコミュニケーションも容易になります。 そしてRxJSは元来が非同期処理を扱うためのものですから、そのイベントの由来が同じスレッドかWorkerスレッドかは気にせず同じ非同期の枠で考えられます。 このことからも、Workerによる別スレッドでの処理とそのイベント購読はRxJSでうまく抽象化できるのではないかと考えています。

f:id:lacolaco:20190326141608p:plain

アプローチ 1. Worker as a Subject

まずひとつめのアプローチとして、WorkerそのものがSubjectのインターフェースを備えるというアプローチを試みます。 これはWorkerスレッドから送られてくるイベントをメインスレッドで購読する形です。

f:id:lacolaco:20190326142344p:plain

次のコードは、Workerを隠蔽する WorkerSubject の実装例です。 WorkerSubjectnext メソッドで渡されたデータをWorkerにpostMessageし、 Workerの message / error イベントを内部の子Subjectで購読します。 WorkerSubject を購読する Subscriberは 内部の子Subjectを間接的に購読することになります。 これは実装の一例であって、もっと効率的な実装はあると思います。

export class WorkerSubject<T> extends Subject<T> {
  private inner = new ReplaySubject();
  private sub = new Subscription();

  constructor(public worker: Worker) {
    super();
    this.sub.add(
      fromEvent<MessageEvent>(worker, 'message').subscribe(ev =>
        this.inner.next(ev.data),
      ),
    );
    this.sub.add(
      fromEvent<ErrorEvent>(worker, 'error').subscribe(ev =>
        this.inner.error(ev.error),
      ),
    );
    this._subscribe = this.inner._subscribe.bind(this.inner);
  }

  next(value: T) {
    this.worker.postMessage(value);
  }

  complete() {
    this.sub.unsubscribe();
    this.inner.complete();
    super.complete();
  }
}

具体的な例として、Markdown文字列をHTML文字列に変換する処理をWorkerスレッドで実行してみます。 まずは次のように ./compile-markdown.ts を作成します。

Subjectに隠蔽するためには、入力に対して出力を返すping-pong型のWorkerであると好都合です(必ずしもそうでなくてもよいですが)。 onmessageで受け取った文字列を変換し、 postMessage でレスポンスのイベントを発火しています。

import * as marked from 'marked';

function compileMarkdown(markdownString: string) {
  return new Promise<string>((resolve, reject) => {
    marked(markdownString, {}, (err, result) => {
      if (err) {
        reject(err);
        return;
      }
      return resolve(result);
    });
  });
}

// [tsconfig] lib: "dom" and "webworker" are exclutive.
const _self: Worker = self as any;

_self.onmessage = ev => {
  compileMarkdown(ev.data)
    .then(result => {
      _self.postMessage(result);
    })
    .catch(err => {
      throw err;
    });
};

const _self: Worker = self as any; はTypeScriptのためのハックです。同じtsconfigで domwebworker の両方をターゲットとすることができない問題があるため、手動で self の型をグローバルの Window 型ではなく Worker 型に補正しています。

あとはModule Workerを作って、 WorkerSubject でラップすると使えるようになります。 Angularのコンポーネントで使うと、次のようなコードになります。 結果としてこのコンポーネントのテンプレートには ## foo<h2>foo</h2> に変換されたHTML文字列が表示されます。

@Component({
  selector: 'app-root',
  template: `
    <div>{{ compiled$ | async }}</div>
  `,
})
export class AppComponent implements OnInit {
  compiled$: Subject<string>;

  constructor() {
    // Module Workerの作成とWorkerSubjectでのラップ
    this.compiled$ = new WorkerSubject(
      new Worker('./compile-markdown', { type: 'module' }),
    );
  }

  ngOnInit() {
    // WorkerSubjectに新しいデータを送る
    this.compiled$.next('## foo');
  }
}

このアプローチのメリットは次のものが考えられます。

  • Workerの実装に制約がなく、既存のWorkerはほとんど適用可能である
  • Module Workerがコード分割する境界としてわかりやすく、ES ModuleをそのままWorker化できるのが簡単
  • もともとnext/subscribeでWriteとReadが非同期的であることから、その内部がWorkerを経由していても利用側に影響しない

一方で、Worker側の実装ではpostMessage/onmessageを隠蔽できていないという課題もまだあります。

アプローチ 2. Worker as an Operator

もうひとつのアプローチは、Observableに適用するオペレーターの処理をWorkerスレッドに委譲するものです。 Observableの実体や購読者はメインスレッドにあるまま、データ処理の一部分だけの並行性を高められます。

f:id:lacolaco:20190326150102p:plain

このアプローチの実装はWorkerを関数のように扱うため、Module WorkerよりもgreenletによるインラインWorker化のほうが向いています。 インラインWorkerとは、 Data URIを使って作成されるWorkerのことを指しています。 greenletは、Promiseを返す非同期関数を実行時にインラインWorkerに変換してWorkerスレッドで実行するライブラリです。

github.com

RxJSのオペレーターで、関数を渡して処理をおこなう代表的なものは map 系のものでしょう。 どのオペレーターにも適用できますが、ここでは map オペレーターをWorker化した mapOnWorker オペレーターを実装してみます。

RxJSのオペレーターの実体はObservableを受け取ってObservableを返す関数です。 mapOnWorker は次のように簡単に実装できます。

import gleenlet from 'greenlet';
import { from, Observable } from 'rxjs';
import { concatMap } from 'rxjs/operators';

export function mapOnWorker<T, U>(fn: (arg: T) => Promise<U>) {
  // 関数をインラインWorker化する
  const workerized = gleenlet(fn);
  return (source: Observable<T>): Observable<U> => {
    // 1. `workerized`関数を呼び出す
    // 2. 戻り値のPromiseを `from` 関数でObservableに変換する
    // 3. `concatMap` オペレーターで元のObservableと結合する
    return source.pipe(concatMap(v => from(workerized(v))));
  };
}

map オペレーターと同じように順序を守るために concatMap を使いましたが、mergeMapswitchMap のようなオペレーターを使うものも簡単に作れます。

export const mapOnWorker = concatMapOnWorker;

export function concatMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) {
  const workerized = gleenlet(fn);
  return (source: Observable<T>): Observable<U> => {
    return source.pipe(concatMap(v => from(workerized(v))));
  };
}

export function switchMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) {
  const workerized = gleenlet(fn);
  return (source: Observable<T>): Observable<U> => {
    return source.pipe(switchMap(v => from(workerized(v))));
  };
}

export function exhaustMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) {
  const workerized = gleenlet(fn);
  return (source: Observable<T>): Observable<U> => {
    return source.pipe(exhaustMap(v => from(workerized(v))));
  };
}

Workerへの関心はオペレーターの内部に完全に閉じているので、オペレーターの利用側は他のオペレーターと同じようにただ pipe メソッドに渡すだけです。

import { interval, Observable } from 'rxjs';
import { mapOnWorker } from '../lib/mapOnWorker';

@Component({
  selector: 'app-root',
  template: `
    <div>{{ calculated$ | async }}</div>
  `,
})
export class AppComponent implements OnInit {
  calculated$: Observable<any>;

  constructor() {
    // 1msごとに発火するObservable
    this.calculated$ = interval(1).pipe(
      // Workerで計算処理を実行する
      mapOnWorker(async i => Math.sqrt(i)),
    );
  }
}

このアプローチのメリットは、オペレーター利用側にまったく関心を漏らさずにCPU負荷の大きいオペレーター処理をWorkerスレッドに逃がせるところです。 上記の例では非同期化するまでもない処理ですが、文字列の全文検索だったりパターンマッチだったり、メインスレッドをブロックしうる計算処理がObservableのオペレーターにあるときには有効です。

デメリットはオペレーターの呼び出しのたびにかかるインラインWorkerとのコミュニケーションのコストです。 Workerスレッドで実行する処理があまり時間のかからないものであれば、オーバーヘッドが相対的に高く付くこともあるかもしれません。

まとめ

この記事ではWeb Workerを意識せずにWeb Workerの恩恵を受けられるようにRxJSを使って抽象化するアプローチを紹介しました。 Promiseを使ってクラスや関数をWorker化するアプローチは Google Chromeチームの Comlink や Cloonyがとてもクールです。 しかし複数回発行するイベントを扱うにはどうしてもObservableのようなモデルが必要だと思います。

github.com

github.com

サンプルコードはGitHub上で公開しています。 コード例はどれも完璧である保証はなく、もっと効率的な実装があるかもしれませんので、ご利用は自由ですが自己責任でよろしくおねがいします。

github.com