Contents
対象読者と要約
このガイドは AsyncIterator と for-await-of を実務で安全に使うための設計・運用指針を示します。入門〜中級エンジニアを主対象とし、仕様上の注意点、キャンセル・クリーンアップ、相互運用、テスト/運用方法までを短く整理します。まず想定読者と要点を示します。
想定読者
想定読者は次の通りです。
モダン JavaScript(async generator / for-await-of)を使ってストリーミング処理を実装または運用するエンジニア。Observable や Streams との変換を扱う中級者も含みます。
要点のまとめ
ここで押さえるべき要点を列挙します。
- async generator は try...finally で必ずリソース解放を書くこと。
- for-await-of の中断時には規格上 iterator.return が呼ばれる(ただし存在する場合のみ)。
- generator.throw は同期/非同期で挙動が異なるためターゲット環境で確認すること。
- Observable などへ変換する際は observer.next が例外を投げた場合の後処理を必ず行うこと。
- テストは AbortSignal や retry の待機をモックして自動化すること。
AsyncIterator と for-await-of の振る舞い(プロトコルと仕様)
非同期イテレーションのプロトコルは ECMAScript で定義されています。ここでは next/return/throw の基本的な意味と、for-await-of が中断やエラーにどう対処するかを仕様に沿って整理します。
プロトコル要点
基本を短く整理します。
- AsyncIterator は next(), return(), throw() を持ちます。next() は Promise<{ value, done }〉を返すことが仕様上期待されます。
- return() は早期終了を伝えるためのメソッドです。存在すれば for-await-of の中断時に呼び出され、producer の finally を走らせる契機になります。
- throw() は consumer から例外を注入する手段です。async generator と sync generator で呼び出し元への伝搬のタイミングが異なります。
参考: ECMAScript 仕様(AsyncIteratorClose 等)および MDN を参照してください(例: https://tc39.es/ecma262/)。
for-await-of による中断時の return 呼び出し(ECMAScript 規格に基づく振る舞い)
ここでは中断時に return が呼ばれる条件を示します。
規格上、for-await-of がループを中断する(break/return/例外)ときは AsyncIteratorClose が走ります。AsyncIteratorClose は iterator.return が関数であればそれを呼び出します。return が Promise を返して拒否されれば、その拒否が最終的に伝搬します。逆に iterator に return がない場合はクリーンアップ呼び出しは行われません。
注意点として、next() の Promise が拒否された場合でも可能であれば return が呼ばれますが、古いランタイムやトランスパイル済みコードでは実装差があるため対象環境での確認が必須です。参照: ECMAScript の AsyncIteratorClose に関する節(https://tc39.es/ecma262/)。
generator.throw の同期/非同期差と実装差
generator.throw の振る舞いは同期ジェネレータと非同期ジェネレータで異なります。
- 同期ジェネレータ(function*)では iterator.throw を呼ぶと、投げ込まれた例外がジェネレータ内の現在の yield 式で処理されます。ハンドルされなければ呼び出し元に同期的に投げ返されます。呼び出すタイミング(最初の next の前など)によって挙動が変わります。
- 非同期ジェネレータ(async function*)では iterator.throw は Promise を返します。ジェネレータが例外を処理して次の yield を返せば Promise は解決し、処理されなければ Promise は拒否されます。
さらに、Babel や古いポリフィルではスケジューリングや return/throw の扱いが微妙に異なることがあります。必ずターゲット環境で iterator.throw の挙動(同期的に例外が出るか/Promise が拒否されるか、return の呼び出しタイミング)を確認してください。
コンシューマ側パターン:継続/停止/部分失敗の扱い
コンシューマはストリームを止める判断と要素単位の失敗方針を決める責務があります。ここでは代表的な設計パターンと並列化の注意点を示します。
全体停止と要素単位のハンドリング
まず典型的な二つのパターンを示します。
以下の例は、全体を止める(fail-fast)パターンと、個々の要素の失敗を局所化して続行するパターンです。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// fail-fast try { for await (const item of stream) { await process(item); } } catch (e) { // 全体を止めて処理 logFatal(e); } // 部分失敗許容 for await (const item of stream) { try { await process(item); } catch (err) { logPartial(item, err); // 続行する } } |
互換性注記(上記コード)
上記は Node.js 14+ またはモダンブラウザ(ES2018 以降)を前提とします。古いランタイムではトランスパイルや polyfill(Symbol.asyncIterator 等)が必要です。
要素単位の結果表現(Result 型/タプル)
要素単位エラーをストリームで扱う方法を比較します。
- Result オブジェクト { ok: true, value } / { ok: false, error }
- 長所: 型で成功/失敗が明確。下流での判断が容易。
- 短所: 常に Result を扱う負担がある。API 契約が増える。
- タプル/union 型 [value, error] 等
- 長所: 軽量でシンプル。
- 短所: 可読性や型安全性がやや落ちる。
TypeScript を使う場合は Result 型の利点が大きい一方で、トランザクション性やメトリクス設計を考慮して選択してください。
並列処理とバックプレッシャー
並列処理はスループット向上に有効ですが、メモリや下流の処理能力を圧迫します。
単純な解決策は同時実行数を制限するセマフォや p-limit 風の仕組みです。下流の性能とメモリ上限を基に同時実行数を決めてください。運用では環境変数で並列度を調整できる設計にすると便利です。
プロデューサ側パターン:async generator の設計、キャンセルとクリーンアップ
プロデューサ側ではリソース解放とキャンセル伝搬を厳密に扱う必要があります。ここでは安全な実装パターンと AbortSignal の扱い方を示します。
try...finally で確実に解放する
async generator では finally ブロックでリソース解放を行ってください。これにより iterator.return が呼ばれた場合にも解放処理が確実に実行されます。次は典型的な構造です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
async function* streamFromCursor(cursor, { signal } = {}) { try { while (true) { if (signal?.aborted) throw createAbortError(); const item = await cursor.read(); if (item == null) return; yield item; } } finally { await cursor.close(); } } |
互換性注記(上記コード)
サンプルは Node.js 14+ またはモダンブラウザ(ES2018+)で想定しています。fetch や DOMException、AbortController の扱いはブラウザと Node のバージョン差があるため、Node <18 では polyfill(node-fetch / abort-controller / domexception 等)が必要になることがあります。
AbortSignal と AbortError の互換性
AbortSignal を受け取る API を設計する場合、AbortError の生成方法を環境に依存しない形で抽象化してください。例:
|
1 2 3 4 5 6 7 8 9 |
function createAbortError(message = 'Aborted') { if (typeof DOMException !== 'undefined') { return new DOMException(message, 'AbortError'); } const e = new Error(message); e.name = 'AbortError'; return e; } |
このようにするとブラウザでは DOMException、Node の古い環境では Error に name='AbortError' を付ける形で互換性を確保できます。
再試行(retry)パターンと idempotency
再試行は指数バックオフ、最大試行回数、ジッターを組み合わせます。重要なのは副作用のある操作について idempotency を確保することです。基本的な実装例:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
async function retry(fn, { retries = 3, factor = 2, min = 100, max = 5000, jitter = true } = {}, isTransient = () => true) { let attempt = 0; while (true) { try { return await fn(); } catch (err) { attempt++; if (attempt > retries || !isTransient(err)) throw err; let delay = Math.min(max, min * Math.pow(factor, attempt - 1)); if (jitter) delay = Math.floor(delay * (0.5 + Math.random() / 1.0)); await new Promise(r => setTimeout(r, delay)); } } } |
テスト時はフェイクタイマーで待機を検証してください(Jest の jest.useFakeTimers 等)。
相互運用:Observable/ReadableStream/Node Streams と AsyncIterator の比較と移行
AsyncIterator(pull)と Observable(push)ではエラー伝搬やキャンセル semantics が異なります。移行時はこれらを明示的にマッピングしてください。
AsyncIterator → Observable(安全な変換実装例)
observer.next が例外を投げた場合の後処理を考慮した実装例です。observer.next の例外で producer 側をクリーンアップするために it.return を呼び、必要なら await します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
function toObservable(asyncIterable) { return { subscribe(observer) { const it = asyncIterable[Symbol.asyncIterator](); let closed = false; (async () => { try { for await (const v of it) { if (closed) break; try { observer.next?.(v); } catch (err) { closed = true; try { await it.return?.(); } catch (_) { // it.return のエラーは優先度を調整する場合に記録する } throw err; } } if (!closed) observer.complete?.(); } catch (e) { if (!closed) observer.error?.(e); } })(); return { unsubscribe() { closed = true; const ret = it.return?.(); if (ret && typeof ret.then === 'function') { ret.catch(() => {}); // 未処理拒否を避ける } } }; } }; } |
互換性注記(上記コード)
この変換は Node.js 14+ / ブラウザ(ES2018+)での利用を想定します。unsubscribe を同期的に行いたいライブラリは多いため、可能であればサブスクライブ側に非同期クリーンアップを伝える設計(Promise を返す teardown)を検討してください。
Observable → AsyncIterator(変換パターン)
push を pull に変換するにはキューを用います。バッファ上限・ポリシー(ドロップ/ブロック)を設計してください。簡易なパターンは次のとおりです。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
function observableToAsyncIterable(observable, { bufferSize = 100 } = {}) { return { [Symbol.asyncIterator]() { const queue = []; const resolves = []; let done = false; let err = null; const sub = observable.subscribe({ next(v) { if (err || done) return; if (resolves.length) { resolves.shift()({ value: v, done: false }); } else { queue.push(v); if (queue.length > bufferSize) queue.shift(); // 例: 古い値をドロップ } }, error(e) { err = e; while (resolves.length) resolves.shift()(Promise.reject(e)); }, complete() { done = true; while (resolves.length) resolves.shift()({ value: undefined, done: true }); } }); return { async next() { if (err) throw err; if (queue.length) return { value: queue.shift(), done: false }; if (done) return { value: undefined, done: true }; return await new Promise(res => resolves.push(res)); }, async return() { sub.unsubscribe?.(); return { value: undefined, done: true }; }, async throw(e) { sub.unsubscribe?.(); throw e; } }; } }; } |
ReadableStream / Node Streams の差異と移行チェックリスト
ReadableStream(WHATWG)と Node の Readable ではバックプレッシャやキャンセル API が異なります。移行時のチェックポイント:
- エラーカテゴリを保持すること(transient/fatal/partial)。
- キャンセル経路を明示する(unsubscribe ↔ iterator.return ↔ reader.cancel)。
- Node の stream.pipeline はエラー伝搬とクリーンアップで有用。
- 高水位(highWaterMark)とバッファ戦略を見直し、メモリ増加に備える。
各種仕様参照: WHATWG Streams(https://streams.spec.whatwg.org/)および Node.js Streams ドキュメント。
運用・テスト・観測可能性(CI・モック・ログ設計)
設計を実装したらテストと観測を必ず整備してください。ここでは具体的なテスト戦略とログ/メトリクス設計の例を示します。
単体テストとモック戦略
ユニットテストでは以下を用います。
- モック async iterable をジェネレータで作成し、正常・部分失敗・致命的・中断シナリオを検証する。
- AbortSignal の中断を再現し finally が呼ばれることを assert する(await it.return() を忘れずに)。
- 再試行待機は fake timers(jest.useFakeTimers 等)で検証する。
- 外部 API は nock(Node)や MSW(ブラウザ環境)でモックする。
例(Jest):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
test('generator finally runs on return', async () => { const cleanup = jest.fn(); async function* gen() { try { yield 1; } finally { cleanup(); } } const it = gen(); await it.next(); await it.return(); expect(cleanup).toHaveBeenCalled(); }); |
CI と統合テストの実践
統合テストでは実サービスに近い環境を用意します。Docker コンテナで依存サービスを立て、ネットワーク障害や遅延を注入して回復性を検証します。E2E テストでのメトリクス閾値を CI に組み込むと効果的です。
観測可能性(ログ・メトリクス命名例)
構造化ログと Prometheus スタイルのメトリクスを設計してください。命名とラベル設計の例を示します。
構造化ログ(JSON 例):
|
1 2 3 4 5 6 7 8 9 10 |
{ "ts":"2024-01-01T12:00:00Z", "level":"warn", "stream":"orders", "item_id":"order-123", "error_category":"partial", "retry_count":2, "message":"partial item error" } |
Prometheus 風メトリクス例:
- stream_processing_errors_total{stream="orders",category="partial"} (counter)
- stream_retry_total{stream="orders",operation="fetch_page"} (counter)
- stream_active_streams{stream="orders"} (gauge)
- stream_processing_latency_seconds_bucket{stream="orders"} (histogram)
ラベルの高カードリナリは避け、stream や category の値は限定的にしてください。
まとめ
ここまでのポイントを短く整理します。以降は実装前にチェックリストとして活用してください。
- エラーのカテゴリ分類と停止/継続ポリシーを定義する。
- producer は try...finally でリソース解放を保証し、AbortSignal を受け取れる設計にする。
- consumer は fail-fast か部分失敗許容かをユースケースで決め、並列度を制御する。
- Observable⇄AsyncIterator の変換では observer.next の例外と it.return の呼び出しを明確に処理する。
- テストで finally 実行、abort、retry 回数を自動化し、CI に組み込む。
また、サンプルコードは Node.js 14+ もしくはモダンブラウザ(ES2018+)を想定しています。古い環境で運用する場合はトランスパイルや polyfill の適用を必ず行ってください。
参考(主要ドキュメント): ECMAScript 仕様(https://tc39.es/ecma262/)、WHATWG Streams(https://streams.spec.whatwg.org/)、Node.js Streams ドキュメント(https://nodejs.org/api/stream.html)、MDN(for-await-of 等)。