読者です 読者をやめる 読者になる 読者になる

WHATWG Streams APIについてのメモ

tech

こんにちは丸山@h13i32maruです。

WHATWG Streams APIを少し触ってみたので、そのメモです。

ドキュメント

概念

JavaScriptのプログラム中で処理する(大きめな)データの読み書きを統一的に扱うためのインターフェースをStreams APIとしてWHATWGが策定している。例えばXHRで大きなファイルを読み込んだ時にはすべてのデータを読み込んだあとにコールバックが呼ばれる*1。これだと読み込み中の進捗を知ることができないし、読み込み途中のデータを使って随時画面を更新していくというのも難しい。

そこでStreams APIの出番。もしXHRの代わりにStreams APIを使ったHTTPクライアントがあるとする。そうするとStreams APIを通してサーバからデータが届くたびに随時データを受け取ることができる。つまりデータの読み込みが全部終わるのをまたなくても良くなる。

nodeではFileやHTTPクライアントはStream APIに対応している。と言ってもこれはWHATWG Streams APIとは別物だと思う。nodeのStreams APIをもとにWebで使えるように標準化するのがWHATWG Streams APIなのかな?ここらへんはあまりよく理解していない。

余談だけど、C#にはObservableという物がある。これはPromiseをより強力にしたようなものと考えられる。Promiseが時間軸に対するイベントを一度だけ処理するのに対して、Observableは連続するイベントを何度でも処理できるようになっている。このObservableとStreamsは連続するイベント/データを処理するという点が非常に似ている。しかし前者は各イベントが独立しているように扱うのに対し、後者は各イベントは一つの大きなデータの塊の一部であるように扱う。そのためStreamsにはキューイング/バッファの処理がベースになっている。でもC#のObservableよく知らないので間違ってるかも。

モデル

Readable Streams

読み込み用のStreams APIがReadable Streamsである。そのReadable Streamsにデータを供給するものがunderlying sourceと呼ばれている。たとえばTCPReadableStreamというクラスが存在した場合、underlying sourceTCPということになる。そしてReadable Streamsからデータを受け取るものがconsumerと呼ばれている。

データのフローとしてはこんな感じ。

underlying source => Readable Streams => consumer

underlying sourceとしてpush sourcespull sourcesがある。これはunderlying sourcesがどのようにデータを提供するのかの違いである。

以下のようにunderlying sourceから一方的にデータが送られてくるタイプがpush sourcesである。

source.ondata = (chunk) =>{
  console.log(chunk);
}

以下のようにunderling sourceにデータを取得しに行くタイプがpull sourcessである。

source.read((chunk)=>{
  console.log(chunk);
});

Readable Streamsはこのpush sourcespull sourcesの両方をラップして統一したインターフェースを提供できるように設計されている。

Writable Streams

書き込み用のStreams APIがWritable Streamsである。そのWritable Streamsがデータを書き込む先がunderlying sinkと呼ばれている。例えばFileWritableStramというクラスが存在した場合、underlying sinkFileということになる。そしてWritable Streamsにデータを供給するものがproducerと呼ばれている。

データのフロートしてはこんな感じ。

producer => Writable Streams => underlying shank

Transform Streams

Redable StreamsWritable Streamsの両方を保持しデータを変換するStreamsをTransform Streamsと呼ぶ。これは他のReadable Streamsからデータを貰い、変換処理を行ったデータを提供するものとして使われる。例えばGZipTransformというクラスがあったとすると、FileReadableStreamからデータを受け取りGZip圧縮されたデータを提供するように使える。

データの流れはこんな感じ。

underlying source => Readable Streams => Transform Streams => consumer

ただしこれはまだspecとして言語化されていなくてテスト実装によって設計を確かめている段階。

Pipe Chains & Backpressure

Readable StreamsWritable Streamsをつないで、underlying sourceunderlying sinkに書き込む処理をpipeと呼ぶ。Linuxの|と同じ。例えばTCPReadableStreamFileWritableStreamがあれば、このpipe機能をつかってTCPから読み出したデータをFileに書き出すというのが簡単にできる。

このpipe機能を使った時に問題になることがある。それはReadable StreamsWritable Streamsの間でデータ処理速度に違いがあると上手くデータが流れず詰まってしまう。例えばファイルから読みだしたデータをネットワークに書き込む場合、ネットワークへの書き込み速度のほうがファイルからの読み取り速度より基本的には遅い。なので、ネットワークへの書き込みの部分でデータが詰まってしまう。これを回避してデータの流れを正常にする処理のことをbackpressureと呼ぶ。

Internal Queues & Queuing Strategies

Readable StreamsWritable Streamsはそれぞれ内部にchunkをバッファリングするためのinternal queuesを持っている。

Readable Streamsではunderlying sourceから読みだしたデータをinternal queuesに書き出す。そしてconsumerReadable StreamsのAPIを通してこのinternal queuesからデータを読み出す。

Writable StreamsではproducerWritable StreamsのAPIを通してinternal queuesにデータを書き出す。そしてinternal queuesからデータを読みだしてunderlying sinkに書き出す。

どちらの場合もinternal queuesは内部的に扱われるだけで、外部から直接扱われることはない。

このinternal queuesにデータが溜まり過ぎるとバッファがオーバーして処理ができなくなってしまう。これを防ぐためにbackpressureという仕組みがあると前述した。このbackpressureを実現するためにqueueing strategiesというものがある。これは「chunkを受け取り、そのchunkのサイズを返すメソッド」と「internal queuesにたまっているchunkの合計サイズを受け取り、backpressureをすべきかどうかを返すメソッド」の2つで構成されている。

例えば「internal queuesにたまってるchunkの合計バイト数が一定以上を超えるとbackpressureを開始する」や「internal queuesにたまっているchunkの合計個数が一定以上を超えるとbackpressureを開始する」などのqueuing strategiesが考えられる。

Locking

Readable Streamsは複数のconsumerからデータを読み取られる可能性がある。そうするとデータが分散してしまって正常に処理することができない。そこで他のconsumerから読み取られないように排他制御をすることができる。この排他制御はExclusive Stream Readerで対象のReadable Streamsをカプセル化してlockするということを行う。

このExclusive Stream Readerは基本的にはReadable Streamsと同じように扱うことができる。lockされているReadable Streamsから直接データを読み取ることができない(例外がでる)ようになってる。それとReadable Streamsは同時にひとつのExclusive Stream Readerによってしかlockされることはない。

Readable Streamsのサンプルコード

まずはunderlying sourceとなるものを実装する。今回は一定間隔でデータがpushされるものを用意する。

class Source {
  constructor(max = 10, interval = 200) {
    if (max <= 0 || interval <= 0) {
      Promise.resolve().then(()=>{
        this.onerror && this.onerror(new Error('max and interval must be greater than 0'));
      });
      return;
    }

    var count = 0;
    var id = setInterval(()=>{
      if (this._stopped) {
        return;
      }
      if (this._finished) {
        clearInterval(id);
        return;
      }
      this.ondata && this.ondata({count: count++});
      if(count === max) {
        clearInterval(id);
        this.oncomplete && this.oncomplete();
      }
    }, interval);
  }

  stop(){
    this._stopped = true;
  }

  start() {
    this._stopped = false;
  }

  close(){
    this._finished = true;
  }

  ondata(){}
  oncomplete(){}
  onerror(){}
}

次にこれをReadable Streamsで扱えるようにする。

class SampleReadableStream extends ReadableStream {
  constructor() {
    var source = new Source();

    function start(enqueue, close, error) {
      source.ondata = (chunk) => {
        if (!enqueue(chunk)) {
          source.stop();
        }
      };

      source.oncomplete = close;
      source.onerror = error;
    }

    function pull(enqueue, close, error) {
      source.start();
    }

    function cancel(reason) {
      source.close();
    }

    super({start, pull, cancel});
  }
}

継承を使わずに使うこともできるが、いろいろ柔軟にできるため今回は継承を使った。 ReadableStreamのコンストラクタに渡している{start, pull, cancel}が重要になってくる。基本的にはこの部分を色々実装していくことになる。

start(enqueue, close, error)

まずstart(enqueue, close, error)SampleRedableStreamがインスタンス化された後に一度だけ実行される。このonStartの中でunderlying sourceからchunkの読み込みを開始し、internal queuesにchunkを保存していくことになる。

enqueueはその名の通りでenqueue(chunk)のようにしてinternal queuesにchunkを保存する。このenqueue(chunk)からfalseが返ってきた場合、chunkの流れが正常ではないということを表す。つまりback pressureを開始しなければならない。Readable Streamsでbackpressureを開始するのにはunerlying sourceからのchunkの読み込みを停止して、internal queuesにchunkを保存しないようにする。このサンプルではsource.stop()を実行しているところ。停止方法はunderlying sourceによって様々あるはず。

ではunerlying sourceからの読み込みをいつ再開すればよいのか?というのは次のpullで説明する。close()を実行すると自身の状態がclosedへ遷移する。error(e)を実行するとerroredへ遷移する。これらの状態になっている場合、enqueue(chunk)を実行すると例外が発生する。start(enqueue, close, error)の戻り値としてPromiseを返すと、それがresolveされるまでReadable Streamsの開始状態への遷移を遅延してくれる。

pull(enqueue, close, error)

pull(enqueue, close, error)はinternal queuesからchunkがない状態で、consumerからデータの読み込み要求が合った場合に実行される。このpullの中でunerlying sourceからデータ読み込みを再開する。このサンプルではsource.start()を実行しているところ。pullの中でenqueueを使ってinternal queuesにchunkを書き込んでも良い。unerlying sourcesがpull sourcesの場合はstartenqueue(chunk)を実行するのではなく、pullの中でデータの取得をしてenqueue(chunk)を実行するのが良い。

cancel(reason)

cancel(reason)はconsumerがRedable Streamsからのデータの読み込みを中止した場合に実行される。このcancel(reason)ではunerlying sourceからのデータ読み出しを終了する処理を実装する。このサンプルではsource.close()の部分。例えばFileをunderlying sourceとして扱っている場合はFileを閉じる処理を行うことになる。reasonはconsumerから渡された任意のオブジェクトになる。cancel(reason)の戻り値としてPromiseを返すと、後処理を非同期で行うことができる。

次にReadable Streamsからデータを読み取るconsumer側の実装。

var rs = new SampleReadableStream();

pump();

function pump(){
  while(rs.state === 'readable') {
    console.log(rs.read());
  }

  if(rs.state === 'waiting') {
    rs.ready.then(pump);
  }
}

rs.closed.then(()=> {
  console.log('closed!')
}).catch((e)=> {
  console.log('errored!', e)
});

RedableStream#state

Readable Streamsにはstateがあり、そのstateに応じデータを読み取るかどうかを判断する必要がある。stateにはwaiting, readable, closed, erroredがある。

  • waiting
    • internal queuesにデータが無いのでデータが溜まるのを待っている状態
    • internal queuesにデータが貯まるとreadableの状態へ遷移する
  • readable
    • internal queuesにデータが存在しているのでデータを読み取る事が出来る状態
    • internal queuesにデータがなくなるとwaitingの状態へ遷移する
    • この状態以外ではReadable Streamsからデータを読み取ることができない(例外が出る)
  • closed
    • Readable Streamsにはもう読み取るべきデータが存在しない状態
    • この状態から他の状態へ遷移することはない
  • errored
    • Readable Streamsの内部でエラーが発生した状態
    • この状態から他の状態へ遷移することはない

詳細な状態遷移は3.2. The Readable Stream State Diagramを見ると良い。

ReadableStream#ready, #read

基本的な実装はwaitingからreadableへの遷移をReadableStream#ready.then()でPromiseを使ってキャッチする。そしてreadableの状態になったらReadable StreamsからデータをReadableStream#read()で読み取る。データを読み終わってwaitingの状態に遷移したらまたreadableに遷移するまで待機する。

ReadableStream#closed

ここで注意しないといけないのはReadableStream#ready.then()ではwaitingからreadable, closed, erroredの遷移をすべてキャッチできる。しかしclosederroredへの遷移はReadableStream#closed.then().catch()でキャッチしたほうが良い。なぜならReadableStream#closedというズバリなAPIが用意されていること、ReadableStream#readyではerroredへの遷移でもfulfilledでPromiseが解決されてしまうこと、エラーオブジェクトを受け取ることができないことという理由から。

Writable Streamsのサンプルコード

まずunderling sinkとなるものを実装する。今回は一定の遅延でコンソールにデータを表示するものを実装する。

class Sink {
  constructor(delay = 200) {
    if (delay <= 0) {
      Promise.resolve().then(()=>{
        this.onerror && this.onerror(new Error('interval must be greater than 0'));
      });
      return;
    }

    this._delay = delay;
  }

  open() {
    return new Promise((resolve)=>{
      setTimeout(resolve, this._delay);
    })
  }

  write(data) {
    return new Promise((resolve)=>{
      setTimeout(()=>{
        console.log('write: ', data);
        resolve();
      }, this._delay);
    })
  }

  close() {
    return new Promise(resolve=>{
      setTimeout(resolve, this._delay);
    })
  }

  abort() {
    return new Promise(resolve=>{
      setTimeout(resolve, this._delay);
    })
  }

  onerror(e){}
}

このunderlying sinkをWritable Streamsで扱えるようにする。

class SampleWritableStream extends WritableStream {
  constructor() {
    var sink = new Sink();

    function start(error) {
      return sink.open().catch(error);
    }

    function write(chunk) {
      return sink.write(chunk);
    }

    function close() {
      return sink.close();
    }

    function abort(reason) {
      return sink.abort();
    }

    super({start, write, close, abort});
  }
}

Writable Streamsのコンストラクタに渡している{start, write, close, abort}が重要になってくる。Readable Streamsと同じく基本的にはこの部分を実装していくことになる。

start(error)

まずstart(error)がインスタンス生成時に一度だけ実行される。この関数の中でunderlying sinkを扱うための準備を行う。多くの場合、underlying sinkを書き込み状態で開くことになると思う。それらの処理に失敗した場合はerror(e)を実行してerroredな状態に遷移する。start(error)の戻り値としてPromiseを返すとPromiseが解決されるまで、writableへの遷移が遅延される。

write(chunk)

write(chunk)はproducerからデータが書き込まれ、internal queuesにデータが貯まると実行される。この関数の中ではunderling sinkに対してデータの書き込みを実行する。ほとんどのunderlying sinkは非同期での書き込みになると思うので、やはりこのwrite(chunk)もPromiseを返すことでそのPromiseが解決されるまで次のwrite(chunk)の呼び出しを遅延してくれる。

close()

close()はproducerがWritable Streamsへの書き込みを終了するためにWritableStream#close()を実行した時に呼び出される。この関数のなかではunderlying sinkのクローズ処理を行うことになる。WritableStreams#close()が実行されるとclosingという状態へ遷移する。この状態ではproducerからWritable Streamsにデータを書き込むことはできなくなる(書き込んでも無視される)。この時もしinternal queuesにデータが残っている場合はwrite(chunk)はinternal queuesからデータが無くなるまで呼び出されることになる。internal queuesにデータがなくなるとclosedな状態へ遷移する。このclose()もPromiseを返すことそのPromiseが解決されるまでclosing状態が続く。

abort(reason)

abort(reason)はproducerがWritable Streamsへの書き込みを異常終了するためにWritableStreams#abort(reason)を実行した時に呼び出される。基本的にはclose()で行った処理と同じことを行う実装をする。もしabort(reason)を実装しなかった場合、close()が代わりに実行されることになる。

次にWritable Streamsへデータを書き込むproducer側の実装をする

var ws = new SampleWritableStream();
var data = [1, 2, 3, 4, 5];

pour();

function pour(e) {
  while(ws.state === 'writable') {
    if (data.length === 0) {
      ws.close();
      return;
    }

    ws.write(data.shift());
  }

  if (ws.state === 'waiting') {
    ws.ready.then(pour);
  }
}

ws.closed.then(()=>{
  console.log('closed!');
}).catch((e)=>{
  console.log('errored!', e);
});

WritableStream#state

Readable Streamsと同様に状態を見ながらデータを書き込んで行くことになる。Writable Streamsの状態は以下のものがある。

  • waiting
    • internal queuesがいっぱいなので、書き込みを行えない状態
    • internal queuesに空きができるとwritableへ遷移する
  • writable
    • internal queuesに空きがあり、データを書き込める状態
    • internal queuesがいっぱいになると、waitingへ遷移する
  • closing
    • 終了処理を行っており、もう書き込みを行えない状態
    • 終了処理が完了するとclosedへ遷移する
  • closed
    • 終了処理が完了した状態
    • この状態から他の状態へ遷移することはない
  • errored
    • 内部でエラーが発生し、もう書き込みを行えない状態
    • この状態から他の状態へ遷移することはない

WritableStream#ready, #write(chunk)

基本的な実装はReadable Streamsと同じでwaitingからwritableへの遷移をWritableStream#readyから取得したPromiseを使ってキャッチする。そしてwritableな状態になったらWritableStream#write(chunk)でデータを書き込む。waitingの状態へ遷移したらまたwritableになるまで待機する。WritableStream#write(chunk)の戻り値はPromiseとなっており、そのchunkの書き込みが完了したら解決されるようになっている。

WritableStream#closed

WritableStream#readyではwaitingからwritable, closing, closed, erroedへの遷移をすべてキャッチできる。しかしここで注意しないといけないのはclosederroedへの遷移はWritableStream#closedでキャッチすべきだということ。理由はReadable Streamsと同じなので割愛。closingへの遷移はキャッチすべき理由がほとんどないので無視して良さそう。

Pipe & TransformStreamsのサンプルコード

ReadableStream#pipeTo(WritableStream, options)

TCPReadableStreamFileWritableStreamというクラスが用意されていた場合、TCPから流れてくるデータをFileに書き込むにはこのように実装する。

var rs = new TCPReadableStream();
var ws = new FileWritableStream();
var options = {preventClose: false, preventAbort: false, preventCancel: false};
rs.pipeTo(ws, options);

ReadableStream#pipeTo(WritableStream, options)ReadableStreamからのデータをWritableStreamに書き込んでいる。options{preventClose, preventAbort, preventCancel}を渡すことができそれぞれ以下の挙動を設定する。

  • preventClose
    • ReadableStreamから読み込むべきデータが無くなり(closed)、処理が終わるときにWritableStream#close()するかどうか
  • preventAbort
    • ReadableStreamがエラーになった時(errored)、WritableStream#abort()するかどうか
  • preventCancel
    • WritableStreamがエラーに成った時(errored)、ReadableStream#cancel()するかどうか

ReadableStream#pipeThrough(TransformStream, options)

次にZIPTransformStreamというクラスが用意されていた場合、TCPから流れてくるデータをZIP圧縮してFileに書き込むにはこのように実装する。

var rs = new TCPReadableStream();
var ws = new FileWritableStream();
var options = {preventClose: false, preventAbort: false, preventCancel: false};
var ts = new ZIPTransformStram();
rs.pipeThrough(ts, options).pipeTo(ws);

このReadableStream#pipeThrough(TransformStream, options)に渡せるTransformStreamとは実際は{writable, readable}をもっとものならなんでも良い。とはいえTransformStream内部の実装としては以下の様な流れになる。

  1. TransformStream内部のWritableStreamにデータが書き込まれる(例ではrsから書き込まれる)
  2. 書き込まれたデータに何かしらの変換処理をする(e.g. ZIP圧縮など)
  3. TransformStream内部のReadableStreamのinternal queuesに変換後のデータを書き込む

引数のoptionsReadableStream#pipeToに渡すものと同じものになる。

Backpressure

このpipe機能で注意が必要なのは各Streamはbackpressureを正しく実装する必要がある。

Readable Streamではinternal queuesにデータが溜まってきたら、underlying sourceからの読み込みを一時停止する。そしてconsumerからデータの取得要求があればunderlying sourceからの読み込みを再開する。

Writable Streamではunderlying sinkに対してデータを書き込んだら、次のデータはその書き込みが完了するまでinternal queuesに溜め込むようにする。そしてinternal queuesにデータがたまりすぎたら、状態をwaitingに変更し、producerに対してbackpuressure実行するように要求する。

このようにbackpressureを実装するのは大変そうだが、実際はReadable Stream、Writable Streamの内部でほとんどよしなにやってくれる。実際にユーザ空間で行うのは以下の三つ。

  1. underlying sourceからのデータ読み込みの一時停止/再開(Readable Stream)
  2. underlying sinkへのデータ書き込み/再開(Writable Stream)
  3. internal queuesにどれだけデータを溜めてよいかの指定(Both)

1.と2.については使用するunderlying source、underlying sinkによって様々なのでユーザ空間で行うしか無い。3.については後述するQueuing Strategiesを指定する形になる。

Queuing Strategiesのサンプルコード

Queuing Strategiesは「internal queuesに溜まっているデータからbackpressure開始すべきかどうかを指定するもの」。データが文字列である場合に、その文字数からbackpressureを開始するかどうかを指定するものを実装するとこんな感じ。以下の実装では1024文字より多くの文字数がinternal queuesにたまったらbackpressureを開始する。

class StringLengthQueuingStrategy {
  size(chunk) {
    return chunk.length;
  }

  shouldApplyBackpressure(queueSize) {
    return queueSize > 1024;
  }
}

var strategy = new StringLengthQueuingStrategy();

QueuingStrategy#size(chunk)

QueuingStrategy#size(chunk)はそのchunkがどれだけのサイズを持っているかを返すメソッド。例えばchunkの文字数やバイト数など。どんなchunkであってもサイズを1とすることもできる。chunkの中身をみてサイズを独自に計算して返すのでもよい。

QueuingStrategy#shouldApplyBackpressure(queueSize)

QueuingStrategy#shouldApplyBackpressure(queueSize)は現在のinternal queuesに溜まっているchunkの総サイズqueueSizeが渡されてくるので、その情報をもとにbackpressureを開始するべきかどうかをbooleanで返すメソッド。

Default Queuing Strategies

WHATWG Streamsには標準で2種類のQueuing Strategiesが提供されている。

  1. ByteLengthQueuingStrategy
  2. chunkのバイトサイズをもとに処理する
  3. new ByteLengthQueuingStrategy({ highWaterMark })
  4. highWaterMarkにはinernal queuesに保存できる最大のバイト数を指定する
  5. chunkには.byteLengthが実装されている必要がある
  6. CountQueuingStrategy
  7. chunkの数を元に処理する
  8. new CountQueuingStrategy({ highWaterMark })
  9. highWaterMarkにはinernal queuesに保存できるchunkの数を指定する
  10. Streams生成時にQueuing Strategiesを指定しなかった場合はhighWarterMark:1のCountQueuingStrategyが使われる

Exclusive Streamsのサンプルコード

Readable Streamsは複数のconsumerからデータを読み取られる可能性がある。そこで単一のconsumerからデータを読み取れるように排他制御するためにExclusive Streamsを使う。

var rs = new TCPReadableStream();
var reader = rs.getReader();

// あとはReadableStreamを扱うのと同じように扱えば良い.

ReadableStream#getReader()からExclusiveStreamを取得する。その後はExclusiveStreamを通してのみデータを読み込むことができる。ExclusiveStream取得後にReadableStream#getReader()したり、ReadableStream#read()で読み取ると例外が起きる。ExclusiveStreamReadableStreamと同じAPIを持っているので基本は同じように使えば良い。

追加されているAPIはExclusiveStream#isActiveExclusiveStream#releaseLock()の2つ。isActiveはこのExclusive Readerがまだ生きており、Readable Streamとつながっているかどうか。これがfalseの場合はもう繋がっていないのでデータを読むことはできない。releaseLock()を実行するとReadable Streamとのつながりが終わり、排他制御を開放する。

動かし方

git clone git@github.com:whatwg/streams.git
cd streams/reference-implementation
npm install
vi sample.js
./node_modules/.bin/traceur-runner sample.js

おわり

  • npmのwhatwg-streamsってなんだろう?使い方分からない
  • PromiseのようにECMAScriptに入るのではなく、あくまでブラウザAPIの仕様としてきまるの?
  • consumerやproducerとして各Streamを扱うのは簡単だけど、自分でFooStreamを作るのは難しそう
  • WHATWGのWritable Streamsのexampleではbackpressureを無視して書き込みしてるけどいいの?
  • まとめていて力尽きたので、最後の方は尻すぼみ感

間違っているところがあれば教えてください。

*1:onprogressとかあった気もするけど