こんにちは丸山@h13i32maruです。
WHATWG Streams APIを少し触ってみたので、そのメモです。
ドキュメント
- WHATWG Streams
- このメモはLast Updated 21 January 2015を元に書いた
- whatwg/streams
- サンプルやリファレンス実装などがある
- このメモは
commit 879902
を元に書いた
- Stream APIがブラウザにやってくる
- @jxckさんがnodeのStream APIと絡めてWHATWGのStreams APIを紹介している
概念
JavaScriptのプログラム中で処理する(大きめな)データの読み書きを統一的に扱うためのインターフェースをStreams APIとしてWHATWGが策定している。例えばXHRで大きなファイルを読み込んだ時にはすべてのデータを読み込んだあとにコールバックが呼ばれる*1。これだと読み込み中の進捗を知ることができないし、読み込み途中のデータを使って随時画面を更新していくというのも難しい。
そこでStreams APIの出番。もしXHRの代わりにStreams APIを使ったHTTPクライアントがあるとする。そうするとStreams APIを通してサーバからデータが届くたびに随時データを受け取ることができる。つまりデータの読み込みが全部終わるのをまたなくても良くなる。
nodeではFileやHTTPクライアントはStream APIに対応している。と言ってもこれはWHATWG Streams APIとは別物だと思う。nodeのStreams APIをもとにWebで使えるように標準化するのがWHATWG Streams APIなのかな?ここらへんはあまりよく理解していない。
余談だけど、C#にはObservableという物がある。これはPromiseをより強力にしたようなものと考えられる。Promiseが時間軸に対するイベントを一度だけ処理するのに対して、Observableは連続するイベントを何度でも処理できるようになっている。このObservableとStreamsは連続するイベント/データを処理するという点が非常に似ている。しかし前者は各イベントが独立しているように扱うのに対し、後者は各イベントは一つの大きなデータの塊の一部であるように扱う。そのためStreamsにはキューイング/バッファの処理がベースになっている。でもC#のObservableよく知らないので間違ってるかも。
モデル
Readable Streams
読み込み用のStreams APIがReadable Streams
である。そのReadable Streams
にデータを供給するものがunderlying source
と呼ばれている。たとえばTCPReadableStream
というクラスが存在した場合、underlying source
はTCP
ということになる。そしてReadable Streams
からデータを受け取るものがconsumer
と呼ばれている。
データのフローとしてはこんな感じ。
underlying source => Readable Streams => consumer
underlying source
としてpush sources
とpull sources
がある。これはunderlying sources
がどのようにデータを提供するのかの違いである。
以下のようにunderlying source
から一方的にデータが送られてくるタイプがpush sources
である。
source.ondata = (chunk) =>{ console.log(chunk); }
以下のようにunderling source
にデータを取得しに行くタイプがpull sourcess
である。
source.read((chunk)=>{ console.log(chunk); });
Readable Streams
はこのpush sources
とpull sources
の両方をラップして統一したインターフェースを提供できるように設計されている。
Writable Streams
書き込み用のStreams APIがWritable Streams
である。そのWritable Streams
がデータを書き込む先がunderlying sink
と呼ばれている。例えばFileWritableStram
というクラスが存在した場合、underlying sink
はFile
ということになる。そしてWritable Streams
にデータを供給するものがproducer
と呼ばれている。
データのフロートしてはこんな感じ。
producer => Writable Streams => underlying shank
Transform Streams
Redable Streams
とWritable Streams
の両方を保持しデータを変換するStreamsをTransform Streams
と呼ぶ。これは他のReadable Streams
からデータを貰い、変換処理を行ったデータを提供するものとして使われる。例えばGZipTransform
というクラスがあったとすると、FileReadableStream
からデータを受け取りGZip圧縮されたデータを提供するように使える。
データの流れはこんな感じ。
underlying source => Readable Streams => Transform Streams => consumer
ただしこれはまだspecとして言語化されていなくてテスト実装によって設計を確かめている段階。
Pipe Chains & Backpressure
Readable Streams
とWritable Streams
をつないで、underlying source
をunderlying sink
に書き込む処理をpipe
と呼ぶ。Linuxの|
と同じ。例えばTCPReadableStream
とFileWritableStream
があれば、このpipe
機能をつかってTCPから読み出したデータをFileに書き出すというのが簡単にできる。
このpipe
機能を使った時に問題になることがある。それはReadable Streams
とWritable Streams
の間でデータ処理速度に違いがあると上手くデータが流れず詰まってしまう。例えばファイルから読みだしたデータをネットワークに書き込む場合、ネットワークへの書き込み速度のほうがファイルからの読み取り速度より基本的には遅い。なので、ネットワークへの書き込みの部分でデータが詰まってしまう。これを回避してデータの流れを正常にする処理のことをbackpressure
と呼ぶ。
Internal Queues & Queuing Strategies
Readable Streams
とWritable Streams
はそれぞれ内部にchunk
をバッファリングするためのinternal queues
を持っている。
Readable Streams
ではunderlying source
から読みだしたデータをinternal queues
に書き出す。そしてconsumer
はReadable Streams
のAPIを通してこのinternal queues
からデータを読み出す。
Writable Streams
ではproducer
がWritable Streams
のAPIを通してinternal queues
にデータを書き出す。そしてinternal queues
からデータを読みだしてunderlying sink
に書き出す。
どちらの場合もinternal queues
は内部的に扱われるだけで、外部から直接扱われることはない。
このinternal queues
にデータが溜まり過ぎるとバッファがオーバーして処理ができなくなってしまう。これを防ぐためにbackpressure
という仕組みがあると前述した。このbackpressure
を実現するためにqueueing strategies
というものがある。これは「chunkを受け取り、そのchunkのサイズを返すメソッド」と「internal queues
にたまっているchunkの合計サイズを受け取り、backpressure
をすべきかどうかを返すメソッド」の2つで構成されている。
例えば「internal queues
にたまってるchunk
の合計バイト数が一定以上を超えるとbackpressure
を開始する」や「internal queues
にたまっているchunk
の合計個数が一定以上を超えるとbackpressure
を開始する」などのqueuing strategies
が考えられる。
Locking
Readable Streams
は複数のconsumer
からデータを読み取られる可能性がある。そうするとデータが分散してしまって正常に処理することができない。そこで他のconsumer
から読み取られないように排他制御をすることができる。この排他制御はExclusive Stream Reader
で対象のReadable Streams
をカプセル化してlock
するということを行う。
このExclusive Stream Reader
は基本的にはReadable Streams
と同じように扱うことができる。lock
されているReadable Streams
から直接データを読み取ることができない(例外がでる)ようになってる。それとReadable Streams
は同時にひとつのExclusive Stream Reader
によってしかlock
されることはない。
Readable Streamsのサンプルコード
まずはunderlying sourceとなるものを実装する。今回は一定間隔でデータがpushされるものを用意する。
class Source { constructor(max = 10, interval = 200) { if (max <= 0 || interval <= 0) { Promise.resolve().then(()=>{ this.onerror && this.onerror(new Error('max and interval must be greater than 0')); }); return; } var count = 0; var id = setInterval(()=>{ if (this._stopped) { return; } if (this._finished) { clearInterval(id); return; } this.ondata && this.ondata({count: count++}); if(count === max) { clearInterval(id); this.oncomplete && this.oncomplete(); } }, interval); } stop(){ this._stopped = true; } start() { this._stopped = false; } close(){ this._finished = true; } ondata(){} oncomplete(){} onerror(){} }
次にこれをReadable Streamsで扱えるようにする。
class SampleReadableStream extends ReadableStream { constructor() { var source = new Source(); function start(enqueue, close, error) { source.ondata = (chunk) => { if (!enqueue(chunk)) { source.stop(); } }; source.oncomplete = close; source.onerror = error; } function pull(enqueue, close, error) { source.start(); } function cancel(reason) { source.close(); } super({start, pull, cancel}); } }
継承を使わずに使うこともできるが、いろいろ柔軟にできるため今回は継承を使った。
ReadableStream
のコンストラクタに渡している{start, pull, cancel}
が重要になってくる。基本的にはこの部分を色々実装していくことになる。
start(enqueue, close, error)
まずstart(enqueue, close, error)
がSampleRedableStream
がインスタンス化された後に一度だけ実行される。このonStart
の中でunderlying sourceからchunkの読み込みを開始し、internal queuesにchunkを保存していくことになる。
enqueue
はその名の通りでenqueue(chunk)
のようにしてinternal queuesにchunkを保存する。このenqueue(chunk)
からfalse
が返ってきた場合、chunkの流れが正常ではないということを表す。つまりback pressureを開始しなければならない。Readable Streamsでbackpressureを開始するのにはunerlying sourceからのchunkの読み込みを停止して、internal queuesにchunkを保存しないようにする。このサンプルではsource.stop()
を実行しているところ。停止方法はunderlying sourceによって様々あるはず。
ではunerlying sourceからの読み込みをいつ再開すればよいのか?というのは次のpull
で説明する。close()
を実行すると自身の状態がclosed
へ遷移する。error(e)
を実行するとerrored
へ遷移する。これらの状態になっている場合、enqueue(chunk)
を実行すると例外が発生する。start(enqueue, close, error)
の戻り値としてPromise
を返すと、それがresolveされるまでReadable Streamsの開始状態への遷移を遅延してくれる。
pull(enqueue, close, error)
pull(enqueue, close, error)
はinternal queuesからchunkがない状態で、consumerからデータの読み込み要求が合った場合に実行される。このpull
の中でunerlying sourceからデータ読み込みを再開する。このサンプルではsource.start()
を実行しているところ。pull
の中でenqueue
を使ってinternal queuesにchunkを書き込んでも良い。unerlying sourcesがpull sourcesの場合はstart
でenqueue(chunk)
を実行するのではなく、pull
の中でデータの取得をしてenqueue(chunk)
を実行するのが良い。
cancel(reason)
cancel(reason)
はconsumerがRedable Streamsからのデータの読み込みを中止した場合に実行される。このcancel(reason)
ではunerlying sourceからのデータ読み出しを終了する処理を実装する。このサンプルではsource.close()
の部分。例えばFileをunderlying sourceとして扱っている場合はFileを閉じる処理を行うことになる。reason
はconsumerから渡された任意のオブジェクトになる。cancel(reason)
の戻り値としてPromiseを返すと、後処理を非同期で行うことができる。
次にReadable Streamsからデータを読み取るconsumer側の実装。
var rs = new SampleReadableStream(); pump(); function pump(){ while(rs.state === 'readable') { console.log(rs.read()); } if(rs.state === 'waiting') { rs.ready.then(pump); } } rs.closed.then(()=> { console.log('closed!') }).catch((e)=> { console.log('errored!', e) });
RedableStream#state
Readable Streamsにはstateがあり、そのstateに応じデータを読み取るかどうかを判断する必要がある。stateにはwaiting
, readable
, closed
, errored
がある。
waiting
- internal queuesにデータが無いのでデータが溜まるのを待っている状態
- internal queuesにデータが貯まると
readable
の状態へ遷移する
readable
- internal queuesにデータが存在しているのでデータを読み取る事が出来る状態
- internal queuesにデータがなくなると
waiting
の状態へ遷移する - この状態以外ではReadable Streamsからデータを読み取ることができない(例外が出る)
closed
- Readable Streamsにはもう読み取るべきデータが存在しない状態
- この状態から他の状態へ遷移することはない
errored
- Readable Streamsの内部でエラーが発生した状態
- この状態から他の状態へ遷移することはない
詳細な状態遷移は3.2. The Readable Stream State Diagramを見ると良い。
ReadableStream#ready, #read
基本的な実装はwaiting
からreadable
への遷移をReadableStream#ready.then()
でPromiseを使ってキャッチする。そしてreadable
の状態になったらReadable StreamsからデータをReadableStream#read()
で読み取る。データを読み終わってwaiting
の状態に遷移したらまたreadable
に遷移するまで待機する。
ReadableStream#closed
ここで注意しないといけないのはReadableStream#ready.then()
ではwaiting
からreadable
, closed
, errored
の遷移をすべてキャッチできる。しかしclosed
とerrored
への遷移はReadableStream#closed.then().catch()
でキャッチしたほうが良い。なぜならReadableStream#closed
というズバリなAPIが用意されていること、ReadableStream#ready
ではerrored
への遷移でもfulfilledでPromiseが解決されてしまうこと、エラーオブジェクトを受け取ることができないことという理由から。
Writable Streamsのサンプルコード
まずunderling sinkとなるものを実装する。今回は一定の遅延でコンソールにデータを表示するものを実装する。
class Sink { constructor(delay = 200) { if (delay <= 0) { Promise.resolve().then(()=>{ this.onerror && this.onerror(new Error('interval must be greater than 0')); }); return; } this._delay = delay; } open() { return new Promise((resolve)=>{ setTimeout(resolve, this._delay); }) } write(data) { return new Promise((resolve)=>{ setTimeout(()=>{ console.log('write: ', data); resolve(); }, this._delay); }) } close() { return new Promise(resolve=>{ setTimeout(resolve, this._delay); }) } abort() { return new Promise(resolve=>{ setTimeout(resolve, this._delay); }) } onerror(e){} }
このunderlying sinkをWritable Streamsで扱えるようにする。
class SampleWritableStream extends WritableStream { constructor() { var sink = new Sink(); function start(error) { return sink.open().catch(error); } function write(chunk) { return sink.write(chunk); } function close() { return sink.close(); } function abort(reason) { return sink.abort(); } super({start, write, close, abort}); } }
Writable Streamsのコンストラクタに渡している{start, write, close, abort}
が重要になってくる。Readable Streamsと同じく基本的にはこの部分を実装していくことになる。
start(error)
まずstart(error)
がインスタンス生成時に一度だけ実行される。この関数の中でunderlying sinkを扱うための準備を行う。多くの場合、underlying sinkを書き込み状態で開くことになると思う。それらの処理に失敗した場合はerror(e)
を実行してerrored
な状態に遷移する。start(error)
の戻り値としてPromiseを返すとPromiseが解決されるまで、writable
への遷移が遅延される。
write(chunk)
write(chunk)
はproducerからデータが書き込まれ、internal queuesにデータが貯まると実行される。この関数の中ではunderling sinkに対してデータの書き込みを実行する。ほとんどのunderlying sinkは非同期での書き込みになると思うので、やはりこのwrite(chunk)
もPromiseを返すことでそのPromiseが解決されるまで次のwrite(chunk)
の呼び出しを遅延してくれる。
close()
close()
はproducerがWritable Streamsへの書き込みを終了するためにWritableStream#close()
を実行した時に呼び出される。この関数のなかではunderlying sinkのクローズ処理を行うことになる。WritableStreams#close()
が実行されるとclosing
という状態へ遷移する。この状態ではproducerからWritable Streamsにデータを書き込むことはできなくなる(書き込んでも無視される)。この時もしinternal queuesにデータが残っている場合はwrite(chunk)
はinternal queuesからデータが無くなるまで呼び出されることになる。internal queuesにデータがなくなるとclosed
な状態へ遷移する。このclose()
もPromiseを返すことそのPromiseが解決されるまでclosing
状態が続く。
abort(reason)
abort(reason)
はproducerがWritable Streamsへの書き込みを異常終了するためにWritableStreams#abort(reason)
を実行した時に呼び出される。基本的にはclose()
で行った処理と同じことを行う実装をする。もしabort(reason)
を実装しなかった場合、close()
が代わりに実行されることになる。
次にWritable Streamsへデータを書き込むproducer側の実装をする
var ws = new SampleWritableStream(); var data = [1, 2, 3, 4, 5]; pour(); function pour(e) { while(ws.state === 'writable') { if (data.length === 0) { ws.close(); return; } ws.write(data.shift()); } if (ws.state === 'waiting') { ws.ready.then(pour); } } ws.closed.then(()=>{ console.log('closed!'); }).catch((e)=>{ console.log('errored!', e); });
WritableStream#state
Readable Streamsと同様に状態を見ながらデータを書き込んで行くことになる。Writable Streamsの状態は以下のものがある。
waiting
- internal queuesがいっぱいなので、書き込みを行えない状態
- internal queuesに空きができると
writable
へ遷移する
writable
- internal queuesに空きがあり、データを書き込める状態
- internal queuesがいっぱいになると、
waiting
へ遷移する
closing
- 終了処理を行っており、もう書き込みを行えない状態
- 終了処理が完了すると
closed
へ遷移する
closed
- 終了処理が完了した状態
- この状態から他の状態へ遷移することはない
errored
- 内部でエラーが発生し、もう書き込みを行えない状態
- この状態から他の状態へ遷移することはない
WritableStream#ready, #write(chunk)
基本的な実装はReadable Streamsと同じでwaiting
からwritable
への遷移をWritableStream#ready
から取得したPromiseを使ってキャッチする。そしてwritable
な状態になったらWritableStream#write(chunk)
でデータを書き込む。waiting
の状態へ遷移したらまたwritable
になるまで待機する。WritableStream#write(chunk)
の戻り値はPromiseとなっており、そのchunk
の書き込みが完了したら解決されるようになっている。
WritableStream#closed
WritableStream#ready
ではwaiting
からwritable
, closing
, closed
, erroed
への遷移をすべてキャッチできる。しかしここで注意しないといけないのはclosed
とerroed
への遷移はWritableStream#closed
でキャッチすべきだということ。理由はReadable Streamsと同じなので割愛。closing
への遷移はキャッチすべき理由がほとんどないので無視して良さそう。
Pipe & TransformStreamsのサンプルコード
ReadableStream#pipeTo(WritableStream, options)
TCPReadableStream
とFileWritableStream
というクラスが用意されていた場合、TCPから流れてくるデータをFileに書き込むにはこのように実装する。
var rs = new TCPReadableStream(); var ws = new FileWritableStream(); var options = {preventClose: false, preventAbort: false, preventCancel: false}; rs.pipeTo(ws, options);
ReadableStream#pipeTo(WritableStream, options)
でReadableStream
からのデータをWritableStream
に書き込んでいる。options
は{preventClose, preventAbort, preventCancel}
を渡すことができそれぞれ以下の挙動を設定する。
preventClose
ReadableStream
から読み込むべきデータが無くなり(closed
)、処理が終わるときにWritableStream#close()
するかどうか
preventAbort
ReadableStream
がエラーになった時(errored
)、WritableStream#abort()
するかどうか
preventCancel
WritableStream
がエラーに成った時(errored
)、ReadableStream#cancel()
するかどうか
ReadableStream#pipeThrough(TransformStream, options)
次にZIPTransformStream
というクラスが用意されていた場合、TCPから流れてくるデータをZIP圧縮してFileに書き込むにはこのように実装する。
var rs = new TCPReadableStream(); var ws = new FileWritableStream(); var options = {preventClose: false, preventAbort: false, preventCancel: false}; var ts = new ZIPTransformStram(); rs.pipeThrough(ts, options).pipeTo(ws);
このReadableStream#pipeThrough(TransformStream, options)
に渡せるTransformStream
とは実際は{writable, readable}
をもっとものならなんでも良い。とはいえTransformStream
内部の実装としては以下の様な流れになる。
TransformStream
内部のWritableStream
にデータが書き込まれる(例ではrs
から書き込まれる)- 書き込まれたデータに何かしらの変換処理をする(e.g. ZIP圧縮など)
TransformStream
内部のReadableStream
のinternal queuesに変換後のデータを書き込む
引数のoptions
はReadableStream#pipeTo
に渡すものと同じものになる。
Backpressure
このpipe機能で注意が必要なのは各Streamはbackpressureを正しく実装する必要がある。
Readable Streamではinternal queuesにデータが溜まってきたら、underlying sourceからの読み込みを一時停止する。そしてconsumerからデータの取得要求があればunderlying sourceからの読み込みを再開する。
Writable Streamではunderlying sinkに対してデータを書き込んだら、次のデータはその書き込みが完了するまでinternal queuesに溜め込むようにする。そしてinternal queuesにデータがたまりすぎたら、状態をwaiting
に変更し、producerに対してbackpuressure実行するように要求する。
このようにbackpressureを実装するのは大変そうだが、実際はReadable Stream、Writable Streamの内部でほとんどよしなにやってくれる。実際にユーザ空間で行うのは以下の三つ。
- underlying sourceからのデータ読み込みの一時停止/再開(Readable Stream)
- underlying sinkへのデータ書き込み/再開(Writable Stream)
- internal queuesにどれだけデータを溜めてよいかの指定(Both)
1.と2.については使用するunderlying source、underlying sinkによって様々なのでユーザ空間で行うしか無い。3.については後述するQueuing Strategiesを指定する形になる。
Queuing Strategiesのサンプルコード
Queuing Strategiesは「internal queuesに溜まっているデータからbackpressure開始すべきかどうかを指定するもの」。データが文字列である場合に、その文字数からbackpressureを開始するかどうかを指定するものを実装するとこんな感じ。以下の実装では1024文字より多くの文字数がinternal queuesにたまったらbackpressureを開始する。
class StringLengthQueuingStrategy { size(chunk) { return chunk.length; } shouldApplyBackpressure(queueSize) { return queueSize > 1024; } } var strategy = new StringLengthQueuingStrategy();
QueuingStrategy#size(chunk)
QueuingStrategy#size(chunk)
はそのchunkがどれだけのサイズを持っているかを返すメソッド。例えばchunkの文字数やバイト数など。どんなchunkであってもサイズを1とすることもできる。chunkの中身をみてサイズを独自に計算して返すのでもよい。
QueuingStrategy#shouldApplyBackpressure(queueSize)
QueuingStrategy#shouldApplyBackpressure(queueSize)
は現在のinternal queuesに溜まっているchunkの総サイズqueueSize
が渡されてくるので、その情報をもとにbackpressureを開始するべきかどうかをbooleanで返すメソッド。
Default Queuing Strategies
WHATWG Streamsには標準で2種類のQueuing Strategiesが提供されている。
- ByteLengthQueuingStrategy
- chunkのバイトサイズをもとに処理する
new ByteLengthQueuingStrategy({ highWaterMark })
highWaterMark
にはinernal queuesに保存できる最大のバイト数を指定する- chunkには
.byteLength
が実装されている必要がある - CountQueuingStrategy
- chunkの数を元に処理する
new CountQueuingStrategy({ highWaterMark })
highWaterMark
にはinernal queuesに保存できるchunkの数を指定する- Streams生成時にQueuing Strategiesを指定しなかった場合は
highWarterMark:1
のCountQueuingStrategyが使われる
Exclusive Streamsのサンプルコード
Readable Streamsは複数のconsumerからデータを読み取られる可能性がある。そこで単一のconsumerからデータを読み取れるように排他制御するためにExclusive Streamsを使う。
var rs = new TCPReadableStream(); var reader = rs.getReader(); // あとはReadableStreamを扱うのと同じように扱えば良い.
ReadableStream#getReader()
からExclusiveStream
を取得する。その後はExclusiveStream
を通してのみデータを読み込むことができる。ExclusiveStream
取得後にReadableStream#getReader()
したり、ReadableStream#read()
で読み取ると例外が起きる。ExclusiveStream
はReadableStream
と同じAPIを持っているので基本は同じように使えば良い。
追加されているAPIはExclusiveStream#isActive
とExclusiveStream#releaseLock()
の2つ。isActive
はこのExclusive Readerがまだ生きており、Readable Streamとつながっているかどうか。これがfalseの場合はもう繋がっていないのでデータを読むことはできない。releaseLock()
を実行するとReadable Streamとのつながりが終わり、排他制御を開放する。
動かし方
git clone git@github.com:whatwg/streams.git cd streams/reference-implementation npm install vi sample.js ./node_modules/.bin/traceur-runner sample.js
おわり
- npmのwhatwg-streamsってなんだろう?使い方分からない
- PromiseのようにECMAScriptに入るのではなく、あくまでブラウザAPIの仕様としてきまるの?
- consumerやproducerとして各Streamを扱うのは簡単だけど、自分でFooStreamを作るのは難しそう
- WHATWGのWritable Streamsのexampleではbackpressureを無視して書き込みしてるけどいいの?
- まとめていて力尽きたので、最後の方は尻すぼみ感
間違っているところがあれば教えてください。
*1:onprogressとかあった気もするけど