この記事では、RxJS を使ったWeb Workerの抽象化を試みます。
なお、記事中で Web WorkerあるいはWorkerと言ったときに指すのは new Worker()
で作成する Dedicated Workerのみで、Shared WorkerやService Workerなどは対象外です。
なぜWeb Worker?
Web Workerを使うのに2つの目的があります。ひとつはoff-the-main-threadとよく言われる、UIメインスレッドとは別のWorkerスレッドで並行処理をおこなうことによるパフォーマンス改善です。 そしてもうひとつは、仕様がドラフト段階にある ES ModulesのWorker対応 を利用した Module Worker によるコード分割です。
https://html.spec.whatwg.org/multipage/workers.html#module-worker-example
ES ModulesのWorker対応は、現在Chromiumではフラグ付きでサポートされています。
Module Workerでは次のようなコードで type: 'module'
を指定すると、コンストラクタに指定したパスをES Moduleとして読み込めます。
さらにWorkerスクリプト内でもES Moduleのコンテキストで他のモジュールをimport/export文を使えるようになります。
const worker = new Worker('./worker.mjs', { type: 'module' });
もちろんChromeですらまだ普通には使えない機能なので、今Module Workerを使うためには小細工が必要です。 webpackを使っている場合は、GoogleのChromeチームが開発している WorkerPlugin を使うのが便利です。
WorkerPluginは type: 'module'
オプションでModule Workerを作成しているコードを発見すると、
呼び出されているファイルをwebpackのCode Splitting機能で別バンドルに分割しながら、type: 'module'
オプションを除去してくれます。
つまり、このプラグインさえ入れておけば、ES Moduleベースで書かれたファイルをModule Workerとして呼び出し、webpackのビルド後にはWorkerごとにバンドルが自動で分割されている、という状態になります。 多くの場合、Workerで実行したい処理というのはページの初期化時に必要なものではないでしょう。 たいていはユーザーインタラクションや何かのイベントを受けて実行される非同期的なジョブです。 そのような処理は遅延読み込みとWorkerの両方と相性がよいので、Module Workerはページの初期読み込みに必要なバンドルサイズを少なくしながらメインスレッド の負荷も下げられるまさに一石二鳥です。
RxJSによる抽象化
WorkerはpostMessage/ommessageによって他のスレッドとコミュニケーションします。 このイベント駆動の仕組みは、RxJSのSubjectモデルとよく似ています。 Workerそのものでは拡張性に乏しいですが、Subjectで抽象化することでRxJSのオペレーターを使ったデータ加工や、RxJSと連携できる他のJavaScriptライブラリなどとのコミュニケーションも容易になります。 そしてRxJSは元来が非同期処理を扱うためのものですから、そのイベントの由来が同じスレッドかWorkerスレッドかは気にせず同じ非同期の枠で考えられます。 このことからも、Workerによる別スレッドでの処理とそのイベント購読はRxJSでうまく抽象化できるのではないかと考えています。
アプローチ 1. Worker as a Subject
まずひとつめのアプローチとして、WorkerそのものがSubjectのインターフェースを備えるというアプローチを試みます。 これはWorkerスレッドから送られてくるイベントをメインスレッドで購読する形です。
次のコードは、Workerを隠蔽する WorkerSubject
の実装例です。
WorkerSubject
はnext
メソッドで渡されたデータをWorkerにpostMessageし、
Workerの message
/ error
イベントを内部の子Subjectで購読します。
WorkerSubject
を購読する Subscriberは 内部の子Subjectを間接的に購読することになります。
これは実装の一例であって、もっと効率的な実装はあると思います。
export class WorkerSubject<T> extends Subject<T> { private inner = new ReplaySubject(); private sub = new Subscription(); constructor(public worker: Worker) { super(); this.sub.add( fromEvent<MessageEvent>(worker, 'message').subscribe(ev => this.inner.next(ev.data), ), ); this.sub.add( fromEvent<ErrorEvent>(worker, 'error').subscribe(ev => this.inner.error(ev.error), ), ); this._subscribe = this.inner._subscribe.bind(this.inner); } next(value: T) { this.worker.postMessage(value); } complete() { this.sub.unsubscribe(); this.inner.complete(); super.complete(); } }
具体的な例として、Markdown文字列をHTML文字列に変換する処理をWorkerスレッドで実行してみます。
まずは次のように ./compile-markdown.ts
を作成します。
Subjectに隠蔽するためには、入力に対して出力を返すping-pong型のWorkerであると好都合です(必ずしもそうでなくてもよいですが)。
onmessage
で受け取った文字列を変換し、 postMessage
でレスポンスのイベントを発火しています。
import * as marked from 'marked'; function compileMarkdown(markdownString: string) { return new Promise<string>((resolve, reject) => { marked(markdownString, {}, (err, result) => { if (err) { reject(err); return; } return resolve(result); }); }); } // [tsconfig] lib: "dom" and "webworker" are exclutive. const _self: Worker = self as any; _self.onmessage = ev => { compileMarkdown(ev.data) .then(result => { _self.postMessage(result); }) .catch(err => { throw err; }); };
const _self: Worker = self as any;
はTypeScriptのためのハックです。同じtsconfigで dom
と webworker
の両方をターゲットとすることができない問題があるため、手動で self
の型をグローバルの Window
型ではなく Worker
型に補正しています。
あとはModule Workerを作って、 WorkerSubject
でラップすると使えるようになります。
Angularのコンポーネントで使うと、次のようなコードになります。
結果としてこのコンポーネントのテンプレートには ## foo
が <h2>foo</h2>
に変換されたHTML文字列が表示されます。
@Component({ selector: 'app-root', template: ` <div>{{ compiled$ | async }}</div> `, }) export class AppComponent implements OnInit { compiled$: Subject<string>; constructor() { // Module Workerの作成とWorkerSubjectでのラップ this.compiled$ = new WorkerSubject( new Worker('./compile-markdown', { type: 'module' }), ); } ngOnInit() { // WorkerSubjectに新しいデータを送る this.compiled$.next('## foo'); } }
このアプローチのメリットは次のものが考えられます。
- Workerの実装に制約がなく、既存のWorkerはほとんど適用可能である
- Module Workerがコード分割する境界としてわかりやすく、ES ModuleをそのままWorker化できるのが簡単
- もともとnext/subscribeでWriteとReadが非同期的であることから、その内部がWorkerを経由していても利用側に影響しない
一方で、Worker側の実装ではpostMessage/onmessageを隠蔽できていないという課題もまだあります。
アプローチ 2. Worker as an Operator
もうひとつのアプローチは、Observableに適用するオペレーターの処理をWorkerスレッドに委譲するものです。 Observableの実体や購読者はメインスレッドにあるまま、データ処理の一部分だけの並行性を高められます。
このアプローチの実装はWorkerを関数のように扱うため、Module WorkerよりもgreenletによるインラインWorker化のほうが向いています。 インラインWorkerとは、 Data URIを使って作成されるWorkerのことを指しています。 greenletは、Promiseを返す非同期関数を実行時にインラインWorkerに変換してWorkerスレッドで実行するライブラリです。
RxJSのオペレーターで、関数を渡して処理をおこなう代表的なものは map
系のものでしょう。
どのオペレーターにも適用できますが、ここでは map
オペレーターをWorker化した mapOnWorker
オペレーターを実装してみます。
RxJSのオペレーターの実体はObservableを受け取ってObservableを返す関数です。
mapOnWorker
は次のように簡単に実装できます。
import gleenlet from 'greenlet'; import { from, Observable } from 'rxjs'; import { concatMap } from 'rxjs/operators'; export function mapOnWorker<T, U>(fn: (arg: T) => Promise<U>) { // 関数をインラインWorker化する const workerized = gleenlet(fn); return (source: Observable<T>): Observable<U> => { // 1. `workerized`関数を呼び出す // 2. 戻り値のPromiseを `from` 関数でObservableに変換する // 3. `concatMap` オペレーターで元のObservableと結合する return source.pipe(concatMap(v => from(workerized(v)))); }; }
map
オペレーターと同じように順序を守るために concatMap
を使いましたが、mergeMap
や switchMap
のようなオペレーターを使うものも簡単に作れます。
export const mapOnWorker = concatMapOnWorker; export function concatMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) { const workerized = gleenlet(fn); return (source: Observable<T>): Observable<U> => { return source.pipe(concatMap(v => from(workerized(v)))); }; } export function switchMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) { const workerized = gleenlet(fn); return (source: Observable<T>): Observable<U> => { return source.pipe(switchMap(v => from(workerized(v)))); }; } export function exhaustMapOnWorker<T, U>(fn: (arg: T) => Promise<U>) { const workerized = gleenlet(fn); return (source: Observable<T>): Observable<U> => { return source.pipe(exhaustMap(v => from(workerized(v)))); }; }
Workerへの関心はオペレーターの内部に完全に閉じているので、オペレーターの利用側は他のオペレーターと同じようにただ pipe
メソッドに渡すだけです。
import { interval, Observable } from 'rxjs'; import { mapOnWorker } from '../lib/mapOnWorker'; @Component({ selector: 'app-root', template: ` <div>{{ calculated$ | async }}</div> `, }) export class AppComponent implements OnInit { calculated$: Observable<any>; constructor() { // 1msごとに発火するObservable this.calculated$ = interval(1).pipe( // Workerで計算処理を実行する mapOnWorker(async i => Math.sqrt(i)), ); } }
このアプローチのメリットは、オペレーター利用側にまったく関心を漏らさずにCPU負荷の大きいオペレーター処理をWorkerスレッドに逃がせるところです。 上記の例では非同期化するまでもない処理ですが、文字列の全文検索だったりパターンマッチだったり、メインスレッドをブロックしうる計算処理がObservableのオペレーターにあるときには有効です。
デメリットはオペレーターの呼び出しのたびにかかるインラインWorkerとのコミュニケーションのコストです。 Workerスレッドで実行する処理があまり時間のかからないものであれば、オーバーヘッドが相対的に高く付くこともあるかもしれません。
まとめ
この記事ではWeb Workerを意識せずにWeb Workerの恩恵を受けられるようにRxJSを使って抽象化するアプローチを紹介しました。 Promiseを使ってクラスや関数をWorker化するアプローチは Google Chromeチームの Comlink や Cloonyがとてもクールです。 しかし複数回発行するイベントを扱うにはどうしてもObservableのようなモデルが必要だと思います。
サンプルコードはGitHub上で公開しています。 コード例はどれも完璧である保証はなく、もっと効率的な実装があるかもしれませんので、ご利用は自由ですが自己責任でよろしくおねがいします。