redux-sagaの動きを調べた。
redux-sagaは redux-sagaで非同期処理と戦うで説明されているように、非同期処理などを直列プログラムのような形式(直接形式; direct style) で書くためのライブラリである。 そのためにタスクを導入し、その切り替えを制御している。
複数のタスクを協調制御するという点で、コルーチンや軽量スレッド、fiberなどに類似していると感じた。
🔎対象
redux-saga v0.15.3を対象とする。ただし一部コードは説明のためにエラー処理や終了処理を省略する。
また counter-vanilla を元にした以下のプログラムの動きを追う。
// counter.js ////////////////////////////////////////////////////////////////////////// // Reducerの定義 // INCREMENTが来たら +1 する reducer function counter(state, action) { if (typeof state === 'undefined') { return 0 } switch (action.type) { case 'INCREMENT': return state + 1 default: return state } } ////////////////////////////////////////////////////////////////////////// // Sagaの定義 const effects = ReduxSaga.effects const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) // INCREMENT_ASYNC の1秒後にINCREMENTを発生させる。 function* counterSaga() { while(1) { yield effects.take('INCREMENT_ASYNC') yield effects.call(delay, 1000) yield effects.put({type: 'INCREMENT'}) } } ////////////////////////////////////////////////////////////////////////// // redux-sagaの初期化 const createSagaMiddleware = ReduxSaga.default const sagaMiddleware = createSagaMiddleware() var store = Redux.createStore( counter, Redux.applyMiddleware(sagaMiddleware)) ////////////////////////////////////////////////////////////////////////// // タスクの初期化・実行 sagaMiddleware.run(counterSaga) ////////////////////////////////////////////////////////////////////////// // イベントハンドラ document .getElementById('incrementAsync') .addEventListener('click', function () { store.dispatch({ type: 'INCREMENT_ASYNC' }) })
これは Increment async
ボタンを押すと、1秒後にカウンタがインクリメントされるプログラムである。
🚀redux-sagaの初期化
reducer等を定義したのち、以下のようにredux-sagaの初期化を行なう。
// counter.js const sagaMiddleware = createSagaMiddleware() Redux.createStore( ..., Redux.applyMiddleware(sagaMiddleware))
createSagaMiddleware
createSagaMiddleware
は以下のように定義されている。
// src/internal/middleware.js export default function sagaMiddlewareFactory({ context = {}, ...options } = {}) { // 渡されたオプションが妥当であることを確認する if(logger && !is.func(logger)) { throw new Error('`options.logger` passed to the Saga middleware is not a function!') } // sagaMiddlewareを定義する function sagaMiddleware({ getState, dispatch }) { /* snip */ } // 定義した関数を返す return sagaMiddleware }
引数が妥当であることを確認をした上で、内部で定義した sagaMiddleware
を返す。
sagaMiddleware.runの初期化
sagaMiddleware
は applyMiddleware
の内部で呼び出される。 この関数は以下のような定義されている。
// src/internal/middleware.js function sagaMiddleware({ getState, dispatch }) { // sagaMiddleware.run を初期化する sagaMiddleware.run = runSaga.bind(null, { context, dispatch, /* snip */ }) return next => action => { /* snip */ } }
sagaMiddleware.run
に runSaga
を代入し、sagaの実行をできるようにする。 この際、 Function.prototype.bind
を使って dispatch
などのReduxとやりとりするために必要な関数が runSaga
に渡されるようにしている。
Reduxのミドルウェアとして動く next => action => ....
については、イベントハンドラの動きを追う際に見る。
🏃タスクの作成・実行
// counter.js
sagaMiddleware.run(counterSaga)
sagaMiddleware.run
によって counterSaga
の実行が開始される。
タスクの生成
sagaMiddleware.run
には runSaga
が代入されている。 これは以下のような定義となっている。
// src/internal/runSaga.js export function runSaga( storeInterface, saga, ...args ) { // ジェネレータを呼び出す let iterator = saga(...args) // タスクを作成する const task = proc( iterator, /* snip */ ) return task }
saga(...args)
で counterSaga
を呼び出している。 これはジェネレータなので、ここではイテレータが返るだけで関数本体は実行されない。
ここで作ったイテレータを proc
に渡し、タスクを生成する。 proc
は以下のようなコードになっている。
// src/internal/proc.js export default function proc(iterator, /* snip */) { // タスクを作る const task = newTask(parentEffectId, name, iterator, cont) // タスクを実行するnext を呼ぶ next() // タスクを返す return task // タスクを実行する関数 function next(arg, isErr) { /* snip*/ } }
newTask
によって、タスクを管理するオブジェクトを生成している。 その後、タスクを実行する next
を呼び出したのち、タスクを返している。
タスクの実行
next
は以下のようなコードで定義される。
// src/internal/proc.js function next(arg, isErr) { // イテレータを進め、次のyieldまでを実行する let result = iterator.next(arg) // 返ってきた値に応じて処理をする runEffect(result.value, parentEffectId, '', next) }
iterator.next(arg)
でイテレータを進め、その返り値をrunEffect
に渡している。
runEffect
での処理が完了したのち、タスクの実行を再開できるようにするため、 runEffect
には自分自身である next
を渡している。
📤アクションを待つ
counterSaga
は以下のように定義されているので、イテレータが進めたれた際に yield effects.take('INCREMENT_ASYNC')
まで実行される。
// counter.js function* counterSaga() { while(1) { yield effects.take('INCREMENT_ASYNC') // <- ここまで実行される yield effects.call(delay, 1000) yield effects.put({type: 'INCREMENT'}) } }
effects.take('INCREMENT_ASYNC')
の返り値は以下のようなオブジェクトになっており、これがそのままiterator.next()
の返り値になる。
{ "@@redux-saga/IO": true, "TAKE": { "pattern": "INCREMENT_ASYNC" } }
このオブジェクトが runEffect
に渡されると以下のような分岐を経て、 runTakeEffect
に渡される。
// src/internal/io.js // どの種別のエフェクトなのかを判定するための関数群を定義する const TAKE = 'TAKE' const createAsEffectType = type => effect => effect && effect[IO] && effect[type] export const asEffect = { take : createAsEffectType(TAKE) } // src/internal/proc.js function runEffect(effect, parentEffectId, label = '', cb) { let data // effectの種類に応じて、専用の関数を呼ぶ return ( // Non declarative effect is.promise(effect) ? resolvePromise(effect, cb) : is.helper(effect) ? runForkEffect(wrapHelper(effect), effectId, cb) : is.iterator(effect) ? resolveIterator(effect, effectId, name, cb) // declarative effects : is.array(effect) ? runParallelEffect(effect, effectId, cb) : (data = asEffect.take(effect)) ? runTakeEffect(data, cb) : (data = asEffect.put(effect)) ? runPutEffect(data, cb) : (data = asEffect.all(effect)) ? runAllEffect(data, effectId, cb) : (data = asEffect.race(effect)) ? runRaceEffect(data, effectId, cb) : (data = asEffect.call(effect)) ? runCallEffect(data, effectId, cb) : (data = asEffect.cps(effect)) ? runCPSEffect(data, cb) : (data = asEffect.fork(effect)) ? runForkEffect(data, effectId, cb) : (data = asEffect.join(effect)) ? runJoinEffect(data, cb) : (data = asEffect.cancel(effect)) ? runCancelEffect(data, cb) : (data = asEffect.select(effect)) ? runSelectEffect(data, cb) : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, cb) : (data = asEffect.flush(effect)) ? runFlushEffect(data, cb) : (data = asEffect.cancelled(effect)) ? runCancelledEffect(data, cb) : (data = asEffect.getContext(effect)) ? runGetContextEffect(data, cb) : (data = asEffect.setContext(effect)) ? runSetContextEffect(data, cb) : /* anything else returned as is */ cb(effect) ) }
チャンネルへの登録
runTakeEffect
は以下のような定義となっている。
// src/internal/proc.js function runTakeEffect({channel, pattern, maybe}, cb) { channel = channel || stdChannel channel.take(cb, matcher(pattern)) } // src/internal/channel.js function take(cb, matcher) { cb[MATCH] = matcher takers.push(cb) }
runTakeEffect
では、タスク間の通信に使われるチャンネルに対して take
を呼び、 チャンネルの takers
配列に cb
を追加している。 この cb
は next
であるため、あとでこれを呼び出せばcounterSaga
の実行が再開できる。
ここまでで sagaMiddleware.run
の実行は完了し、redux-sagaの初期化が完了する。
👀イベントハンドラ
イベントハンドラを見ていく。
// counter.js document .getElementById('incrementAsync') .addEventListener('click', function () { store.dispatch({ type: 'INCREMENT_ASYNC' }) })
Increment async
ボタンがクリックされると、Reduxのディスパッチャに INCREMENT_ASYNC
アクションが渡される。
アクションの配信
先程は省略した sagaMiddleware
は以下のように定義されている。
// src/internal/middleware.js function sagaMiddleware({ getState, dispatch }) { const sagaEmitter = emitter() // .... return next => action => { // 次のミドルウェアにアクションを転送する const result = next(action) // アクションを配信する sagaEmitter.emit(action) return result } }
次のミドルウェアにそのままアクションを転送することで、reducerを起動する。 その後、アクションを sagaEmitter.emit
に渡す。
emitter
emitter
は以下のように定義されており、 emit
されると対応する subscribers
が起動する。
// src/internal/channel.js export function emitter() { const subscribers = [] function subscribe(sub) { subscribers.push(sub) return () => remove(subscribers, sub) } function emit(item) { const arr = subscribers.slice() for (var i = 0, len = arr.length; i < len; i++) { arr[i](item) } } return { subscribe, emit } }
チャンネル
チャンネルの take
で利用していた stdChannel
は以下のように定義されている。
// src/internal/proc.js export default function proc( iterator, subscribe = () => noop, /* snip */ ) { // procの引数として渡されたsubscribeを用いてチャンネルを作成する const stdChannel = _stdChannel(subscribe) .... } // src/internal/channel.js export function _stdChannel(subscribe) { // eventChannel を用いてチャンネルを作成する const chan = eventChannel(cb => /* snip */) return { ...chan, take(cb, matcher) { /* snip */ } } } export function eventChannel(subscribe, buffer = buffers.none(), matcher) { const chan = channel(buffer) // 何かがemitされた場合は、それをチャンネルにputする subscribe(input => { chan.put(input) }) return { take: chan.take, flush: chan.flush } }
eventChannel
で入力をそのままチャンネルに put
する関数を登録している。 そのため、ディスパッチャに渡されたアクションが、チャンネルへと put
される。
チャンネルへのput
チャンネルの put
は以下のように定義されている。
// src/internal/channel.js function put(input) { // takers配列が空の場合はバッファに追加する if (!takers.length) { return buffer.put(input) } // takers配列に関数が登録されている場合は、それに入力を渡す for (var i = 0; i < takers.length; i++) { const cb = takers[i] if(!cb[MATCH] || cb[MATCH](input)) { takers.splice(i, 1) return cb(input) } } }
takers
配列に格納されている関数に入力を渡している。 今回は next
が登録されているため、counterSaga
の実行が再開される。
つまり、redux-sagaのミドルウェアからの put
(emit
)と、counterSaga
の take
がチャンネルを挟んで対になって動作する。
🤝 プロミスの実行
counterSaga
は 以下のように定義されているので、実行が再開されると effects.call(delay, 1000)
まで実行される。
// counter.js function* counterSaga() { while(1) { yield effects.take('INCREMENT_ASYNC') // <- さっきはここまで実行した yield effects.call(delay, 1000) // <- ここまで実行される yield effects.put({type: 'INCREMENT'}) } }
take
の場合と同様に、この返り値は runEffect
内の分岐を経て、 runCallEffect
に渡される。
// src/internal/proc.js (再掲) function next(arg, isErr) { // イテレータを進め、次のyieldまでを実行する result = iterator.next(arg) // 返ってきた値に応じて処理をする runEffect(result.value, parentEffectId, '', next) } // src/internal/proc.js (再掲) function runEffect(effect, parentEffectId, label = '', cb) { let data // effectの種類に応じて、専用の関数を呼ぶ return ( // Non declarative effect is.promise(effect) ? resolvePromise(effect, cb) : is.helper(effect) ? runForkEffect(wrapHelper(effect), effectId, cb) : is.iterator(effect) ? resolveIterator(effect, effectId, name, cb) // declarative effects : is.array(effect) ? runParallelEffect(effect, effectId, cb) : (data = asEffect.take(effect)) ? runTakeEffect(data, cb) : (data = asEffect.put(effect)) ? runPutEffect(data, cb) : (data = asEffect.all(effect)) ? runAllEffect(data, effectId, cb) : (data = asEffect.race(effect)) ? runRaceEffect(data, effectId, cb) : (data = asEffect.call(effect)) ? runCallEffect(data, effectId, cb) : (data = asEffect.cps(effect)) ? runCPSEffect(data, cb) : (data = asEffect.fork(effect)) ? runForkEffect(data, effectId, cb) : (data = asEffect.join(effect)) ? runJoinEffect(data, cb) : (data = asEffect.cancel(effect)) ? runCancelEffect(data, cb) : (data = asEffect.select(effect)) ? runSelectEffect(data, cb) : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, cb) : (data = asEffect.flush(effect)) ? runFlushEffect(data, cb) : (data = asEffect.cancelled(effect)) ? runCancelledEffect(data, cb) : (data = asEffect.getContext(effect)) ? runGetContextEffect(data, cb) : (data = asEffect.setContext(effect)) ? runSetContextEffect(data, cb) : /* anything else returned as is */ cb(effect) ) }
Promise.prototype.thenへの登録
runCallEffect
は以下のように定義されている。
// src/internal/proc.js function runCallEffect({context, fn, args}, effectId, cb) { // callの引数に渡された関数を起動する。 let result = fn.apply(context, args) // 返り値としてプロミスが返ってくるので、resolvePromiseに渡す return resolvePromise(result, cb) } function resolvePromise(promise, cb) { // Promise.prototype.then にコールバック関数を登録する promise.then( cb, error => cb(error, true) ) }
call
エフェクトに渡された delay
が runCallEffect
内で呼び出される。 その返り値となるプロミスは、resolvePromise
に渡される。 resolvePromise
内では、Promise.prototype.then
に cb
を登録する。
この cb
は counterSaga
の next
であるので、プロミスの実行が完了したのち counterSaga
の実行が再開される。
🔈アクションのディスパッチ
プロミスの実行が完了したのち、next
によって effects.put({type: 'INCREMENT'})
まで実行が進む。
// counter.js function* counterSaga() { while(1) { yield effects.take('INCREMENT_ASYNC') yield effects.call(delay, 1000) // <- さっきはここまで実行した yield effects.put({type: 'INCREMENT'}) // <- ここまで実行される } }
take
エフェクトや put
エフェクトの場合と同様に、 runEffect
を通じて runPutEffect
が呼び出される。
Reduxへのディスパッチ
runPutEffect
は以下のようになっている。
// src/internal/proc.js function runPutEffect({action, resolve}, cb) { // Reduxのdispatchに引数を渡す let result = dispatch(action); // コールバック関数にその結果を渡す return cb(result) }
引数に渡された {type: 'INCREMENT'}
をそのままReduxのディスパッチャに渡す。 これにより counter
reducer が動き、カウンタの値がインクリメントされる。
その後、cb
に代入された next
を呼び、counterSaga
の実行を継続する。 counterSaga
は以下のように定義され、effects.take('INCREMENT_ASYNC')
までループし、これまでと同様の処理が続いていく。
// counter.js function* counterSaga() { while(1) { yield effects.take('INCREMENT_ASYNC') // <- ここまで戻る yield effects.call(delay, 1000) yield effects.put({type: 'INCREMENT'}) // <- さっきはここまで実行した } }
✅まとめ
簡単なシーケンス図にまとめると以下のようになる。 直列的に実行されるように書かれている counterSaga
の処理が何度も中断され、条件が満たされるたびに実行が再開されている。
このようにredux-sagaではタスクの切り替えを制御することで、特定のアクションが来るのを待ったり、プロミスの完了を待つなどの処理を、直接形式で書けるようにしている。