FastAPI

FastAPI の非同期と BackgroundTasks、タスクキュー(Celery・RQ・Dramatiq)徹底比較

ⓘ本ページはプロモーションが含まれています

スポンサードリンク

FastAPI の非同期基礎と標準 BackgroundTasks

FastAPI では async / await を使ったエンドポイントを自然に記述でき、I/O 待ちが多い処理でもスレッドを増やさずに多数のリクエストを同時に捌くことができます。本セクションでは、非同期コードの基本概念と FastAPI が提供する軽量バックグラウンドタスク機能 BackgroundTasks の具体的な使い方・制約について解説します。

async / await の概念

Python の async 関数は「協調マルチタスク」を実現し、イベントループ上で I/O 待ち中に他のコルーチンへ制御が移ります。この仕組みにより CPU バウンドでない処理はスレッドやプロセスを増やす必要がなくなり、リソース効率が向上します【公式ドキュメント】。

await が付いた行でネットワーク待ちが発生し、その間に他のリクエストが処理されます。

非同期エンドポイントの実装例

非同期化は関数定義を def → async def に変えるだけです。FastAPI はシグネチャを解析して自動的にイベントループへ登録します【公式ドキュメント】。

このエンドポイントはデータベースから結果が返るまで他のリクエストをブロックしません。

BackgroundTasks の基本的な使い方と限界

BackgroundTasks はレスポンス送信後に同一プロセス内で関数を実行できる簡易タスク機構です。軽微な処理(ログ書き込みやキャッシュ更新)には便利ですが、キューイング・リトライ・分散実行といった高度な機能はありません。そのため長時間実行や大量ジョブには向きません【一般的な評価】。

上記は数行程度の軽作業に適していますが、画像加工や大量メール送信など CPU バウンドな処理を入れるとプロセス全体の応答性が低下します。本格的な非同期キューが必要な場合は外部ツール(Celery, RQ, Dramatiq 等)へ委譲することを推奨します


本格タスクキューの選定: Celery・RQ・Dramatiq の比較

ここでは代表的な 3 種類の Python 向けタスクキューについて、機能と運用コストの観点から比較し、プロジェクトに最適なツールを選ぶための指標を示します。

機能比較表(概要)

項目 Celery Redis Queue (RQ) Dramatiq
対応ブローカー RabbitMQ, Redis, SQS, Kafka など多数 Redis のみ Redis, RabbitMQ(プラグイン)
結果バックエンド Redis、Database、Memcached 等 Redis(オプション) Redis(デフォルト)
リトライ・スケジューリング 高度なポリシー、ETA/cron 対応 シンプルなリトライのみ 標準でリトライ・遅延実行
監視ツール Flower, Prometheus Exporter RQ Dashboard(サードパーティ) dramatiq‑prometheus, Sentry 連携
パフォーマンス特性 高負荷でも安定、成熟度高い 軽量だが機能は限定的 シリアライズコストが低く高速
学習コスト 中程度(設定項目多め) 低(Redis 知識で可) 中〜高(型安全メッセージ定義が必要)

選定基準のポイント

  1. ブローカーの運用方針
  2. 既に RabbitMQ を使用している場合は Celery が自然な選択です。Redis のみで完結したいなら RQ または Dramatiq が適します。

  3. タスクの複雑性

  4. リトライやスケジュールが必須の場合は Celery、シンプルなキューイングだけで良ければ RQ、低レイテンシが最重要なら Dramatiq が有利です。

  5. 開発リソースと保守性

  6. 小規模チームや短期プロジェクトでは設定が少ない RQ が導入ハードル最低。一方、大規模サービスではエコシステムが充実した Celery が長期的に安心です。

Celery を FastAPI に組み込む実装手順

Celery は最も普及しているタスクキューのひとつです。本節では Redis をブローカー/結果バックエンドとして利用し、FastAPI アプリへ統合する具体的なステップを示します。

Celery インスタンスの定義

celery_app.py に Celery 本体を切り出し、ブローカーと結果バックエンドに同一 Redis を指定します。設定はシンプルに保ちつつ、タスクが所属するキュー名も明示しておきます。

タスク定義と FastAPI からの呼び出し

タスクは純粋な関数として tasks.py に記述します。bind=True を付けることでリトライロジックをタスク内部で利用できます。

FastAPI 側からは .apply_async() または .delay() を呼び出すだけです。

Docker Compose でのローカル環境構築例

以下の docker-compose.yml は FastAPI、Celery ワーカー、Redis の3サービスを定義し、1 コマンドで開発環境を立ち上げられます。

本番環境でのワーカー管理

本番では systemd や Kubernetes の Deployment によりワーカープロセスを常駐させ、障害時に自動再起動させます。Celery ワーカーはメモリリークや例外で停止することがあるため、監視と再起動の仕組みは必須です。


代替キュー (RQ・Dramatiq) の実装例と結果取得パターン

Celery が過剰になるケースでは、軽量な RQ や高速な Dramatiq が有効です。ここでは最小構成のコード例と、タスク完了をクライアントに通知する 3 種類の手法を示します。

Redis Queue (RQ) のシンプル実装

まずはキューとワーカーをセットアップし、FastAPI からジョブを投入する流れです。

ワーカー起動例(Dockerfile に記載しても可):

FastAPI 側でジョブをエンキューします。

Dramatiq の高速実装

Dramatiq はシリアライズコストが低く、マイクロ秒単位のレイテンシを実現します。

FastAPI 側エンドポイント:

タスク結果取得パターンの比較

結果をクライアントに返す方法は用途に応じて選択できます。以下の表で特徴を整理しました。

手法 実装概要 メリット デメリット
Polling /status/{id} を定期的に GET してタスク状態を取得 実装が最も簡単 不要なリクエストが増える
WebSocket サーバ側で WebSocket 接続に完了情報をプッシュ リアルタイム性が高い 接続管理や認証が必要
StreamingResponse (SSE) text/event-stream を返し、サーバ側でイベント送信 HTTP だけで完結、ブラウザ対応広範囲 クライアント側のハンドリングが若干複雑

Polling のコード例

WebSocket で結果プッシュ(Dramatiq + FastAPI)

まずは結果を Redis に保存するユーティリティを用意します。

タスク側で store_result を呼び出し、WebSocket ハンドラは次のように実装します。


本番環境でのデプロイ・運用ベストプラクティス

非同期タスク基盤を本番で安定稼働させるためには、リソース設計・スケーリング・エラーハンドリング・監視の4つの観点を体系的に整備する必要があります。

ワーカープロセス数とリソース設計

CPU コア数とタスク特性(I/O バウンドか CPU バウンドか)に応じて --concurrency を決定します。Celery の公式ガイドでは「CPUコア × 2」を目安としていますが、I/O 待ちが多い場合はさらに増やすことでスループット向上が期待できます【Celery Docs】。

Kubernetes でのデプロイ例(リソース制限付き):

Docker / Kubernetes でのスケーリング設定例

  • Docker Compose: docker-compose up --scale worker=4 とすればワーカーを横に増やせます。
  • Kubernetes HPA: CPU 使用率が一定以上になったら自動でレプリカ数を増減させる設定です。

エラーハンドリング・リトライ戦略

  1. タスクレベル
    @celery.task(bind=True, max_retries=5, default_retry_delay=60) のようにデコレータでリトライポリシーを明示します。

  2. キュー全体の DLQ(Dead Letter Queue)
    task_acks_late=Trueworker_prefetch_multiplier=1 を設定すると、失敗したタスクが自動的に再試行されず DLQ に送られます。DLQ 用の別キューを用意して手動で調査できるようにします。

  3. 例外通知
    Sentry SDK を組み込み、sentry_sdk.capture_exception(e) で障害情報を即座に取得できます。

監視ツールの導入例

ツール 主な役割 導入ポイント
Flower ワーカー状態・タスク統計の可視化 docker run -p 5555:5555 mher/flower --broker=redis://redis:6379/0
Prometheus Exporter キュー長、処理時間などメトリクス収集 Celery 用 prometheus_client ライブラリをタスク開始・終了時にインクリメント
Sentry 例外・パフォーマンスのリアルタイム通知 DSN を環境変数で注入し、sentry_sdk.init() を呼び出すだけ

これらを組み合わせれば、タスクが滞留したときやワーカーが落ちたときに即座にアラートが届く仕組みが完成します。


まとめ

  • FastAPI の async / await は I/O 待ちを最小化し、高い同時リクエスト処理能力を実現します。
  • 標準の BackgroundTasks は軽微なバックグラウンド作業に適していますが、スケールが必要なら外部キューへ委譲すべきです。
  • Celery・RQ・Dramatiq の特徴と選定基準を比較し、プロジェクト要件(ブローカー、タスク複雑性、開発リソース)に合わせて最適なツールを選択してください。
  • Celery を例に、ブローカー設定・タスク定義・Docker Compose によるローカル環境構築手順を具体的に示しました。
  • RQ と Dramatiq の最小実装と、Polling / WebSocket / SSE の 3 種類の結果取得パターンを提示し、ユースケース別に選択肢を提供します。
  • 本番デプロイでは ワーカープロセス数の算出、K8s HPA による自動スケーリング、DLQ とリトライ戦略、Flower / Prometheus / Sentry での監視体制構築が不可欠です。

これらのベストプラクティスを踏まえて非同期タスク基盤を設計すれば、ユーザー体験とシステムスケーラビリティの両立が実現できます。

参考サンプルコードは MIT ライセンスの公式リポジトリ https://github.com/tiangolo/full-stack-fastapi-postgresql を活用してください。必要に応じてクローンし、プロジェクト固有の設定へ置き換えるだけで即座に動作します。

スポンサードリンク

-FastAPI