rxjsのObservable、状態管理に非常に便利ですよね。しかし、その実装がどうなっているか意識する機会はあまりありません。
今回は、asyncとgeneratorを組み合わせてobserverのようなclassを実装してみましょう。
インターフェースを定義する
使用例を考える
こう書いてこう動いたら嬉しいな、と考えてみます。
const wait = async (ms)=>{
return new Promise(resolve => setTimeout(resolve, ms));
};
Observable.create(async function*(){
for(let i = 0; i < 10; i++) {
yield i;
await wait(1000);
}
}).subscribe(value=>console.log(value), err=>console.log(err));
このプログラムは、タイマーをイメージしたプログラムで、1秒毎にconsoleに経過秒数が出力されることが期待値です。rxjsのObserverと違うところは、next()
を使わずに、yieldで表現しているところです。
クラス設計する
きっとこんな感じになります。
class Observable{ // factoryみたいなやつ
static create(generatorFn){} // きっとObserverを作成して返してくれる
}
class Observer{
constructor(generatorFn){
this.generatorFn = generatorFn;
}
subscribe(stepCallback, errCallback){} // subscribeを登録し、処理をスタートする。
async start(){} // 処理をスタートするため関数
}
YAGNI原則に従って、最初のユースケースを満たすために必要無いものは定義しません。また、そこまで厳密に定義する必要はありません。実装を進めていく上で足りなければ足すし、冗長なら削除します。
実装
シンプルなのでざっと書いてみます。for-await構文は比較的新しい書き方ですね。
class Observable{
static create(generatorFn){
return new Observer(generatorFn);
}
}
class Observer{
constructor(generatorFn){
this.generatorFn = generatorFn;
this.subscribers = null;
}
subscribe(stepCallback, errCallback){
this.subscribers = { stepCallback, errCallback };
this.start();
}
async start(){
try{
for await(let value of this.generatorFn()){
this.subscribers.stepCallback(value);
}
}catch(e){
this.subscribers.errCallback(e);
}
}
}
nullチェックや値のチェックは行っていません。実際に実装するときは、例外処理も含め、適切なところでErrorを投げたり、引数が妥当かどうかのチェックをしてあげると親切です。
テスト
最初のユースケースを繋げて実行してみて、実際にちゃんと0〜9の値が1秒ごとに表示され、10以降の出力が行われないことを確かめます。Chromeをお使いなら、タブを開いて開発者ツールのコンソールにそのままコピペすると実際に動かすことができます。
// Implemented code.
class Observable{
static create(generatorFn){
return new Observer(generatorFn);
}
}
class Observer{
constructor(generatorFn){
this.generatorFn = generatorFn;
this.subscribers = null;
}
subscribe(stepCallback, errCallback){
this.subscribers = { stepCallback, errCallback };
this.start();
}
async start(){
try{
for await(let value of this.generatorFn()){
this.subscribers.stepCallback(value);
}
}catch(e){
this.subscribers.errCallback(e);
}
}
}
// Test code.
const wait = async (ms)=>{
return new Promise(resolve => setTimeout(resolve, ms));
};
Observable.create(async function*(){
for(let i = 0; i < 10; i++) {
yield i;
await wait(1000);
}
}).subscribe(value=>console.log(value), err=>console.log(err));
結果
0
1
2
3
4
5
6
7
8
9
想定通りですね。
まとめ
generatorとasync/awaitを組み合わせてobserverを実装してみました。シンプルな実装ですが、generatorと非同期処理の良い使用例になったかと思います。
まだobservableほどの柔軟性はありませんが、そこそこ似たようなことは出来るはずです…!