Contents
FastAPI の非同期基礎と標準 BackgroundTasks
FastAPI では async / await を使ったエンドポイントを自然に記述でき、I/O 待ちが多い処理でもスレッドを増やさずに多数のリクエストを同時に捌くことができます。本セクションでは、非同期コードの基本概念と FastAPI が提供する軽量バックグラウンドタスク機能 BackgroundTasks の具体的な使い方・制約について解説します。
async / await の概念
Python の async 関数は「協調マルチタスク」を実現し、イベントループ上で I/O 待ち中に他のコルーチンへ制御が移ります。この仕組みにより CPU バウンドでない処理はスレッドやプロセスを増やす必要がなくなり、リソース効率が向上します【公式ドキュメント】。
|
1 2 3 4 5 6 7 8 9 10 11 |
from fastapi import FastAPI import httpx app = FastAPI() @app.get("/weather/{city}") async def get_weather(city: str): async with httpx.AsyncClient() as client: resp = await client.get(f"https://api.example.com/weather/{city}") return resp.json() |
await が付いた行でネットワーク待ちが発生し、その間に他のリクエストが処理されます。
非同期エンドポイントの実装例
非同期化は関数定義を def → async def に変えるだけです。FastAPI はシグネチャを解析して自動的にイベントループへ登録します【公式ドキュメント】。
|
1 2 3 4 5 6 |
@app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None): # 例: asyncpg を使った非同期 DB 取得 item = await fetch_item_from_db(item_id) return {"item": item, "query": q} |
このエンドポイントはデータベースから結果が返るまで他のリクエストをブロックしません。
BackgroundTasks の基本的な使い方と限界
BackgroundTasks はレスポンス送信後に同一プロセス内で関数を実行できる簡易タスク機構です。軽微な処理(ログ書き込みやキャッシュ更新)には便利ですが、キューイング・リトライ・分散実行といった高度な機能はありません。そのため長時間実行や大量ジョブには向きません【一般的な評価】。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from fastapi import BackgroundTasks, FastAPI app = FastAPI() def write_log(message: str): with open("log.txt", "a") as f: f.write(message + "\n") @app.post("/notify") async def send_notification(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Sent to {email}") return {"status": "queued"} |
上記は数行程度の軽作業に適していますが、画像加工や大量メール送信など CPU バウンドな処理を入れるとプロセス全体の応答性が低下します。本格的な非同期キューが必要な場合は外部ツール(Celery, RQ, Dramatiq 等)へ委譲することを推奨します。
本格タスクキューの選定: Celery・RQ・Dramatiq の比較
ここでは代表的な 3 種類の Python 向けタスクキューについて、機能と運用コストの観点から比較し、プロジェクトに最適なツールを選ぶための指標を示します。
機能比較表(概要)
| 項目 | Celery | Redis Queue (RQ) | Dramatiq |
|---|---|---|---|
| 対応ブローカー | RabbitMQ, Redis, SQS, Kafka など多数 | Redis のみ | Redis, RabbitMQ(プラグイン) |
| 結果バックエンド | Redis、Database、Memcached 等 | Redis(オプション) | Redis(デフォルト) |
| リトライ・スケジューリング | 高度なポリシー、ETA/cron 対応 | シンプルなリトライのみ | 標準でリトライ・遅延実行 |
| 監視ツール | Flower, Prometheus Exporter | RQ Dashboard(サードパーティ) | dramatiq‑prometheus, Sentry 連携 |
| パフォーマンス特性 | 高負荷でも安定、成熟度高い | 軽量だが機能は限定的 | シリアライズコストが低く高速 |
| 学習コスト | 中程度(設定項目多め) | 低(Redis 知識で可) | 中〜高(型安全メッセージ定義が必要) |
選定基準のポイント
- ブローカーの運用方針
-
既に RabbitMQ を使用している場合は Celery が自然な選択です。Redis のみで完結したいなら RQ または Dramatiq が適します。
-
タスクの複雑性
-
リトライやスケジュールが必須の場合は Celery、シンプルなキューイングだけで良ければ RQ、低レイテンシが最重要なら Dramatiq が有利です。
-
開発リソースと保守性
- 小規模チームや短期プロジェクトでは設定が少ない RQ が導入ハードル最低。一方、大規模サービスではエコシステムが充実した Celery が長期的に安心です。
Celery を FastAPI に組み込む実装手順
Celery は最も普及しているタスクキューのひとつです。本節では Redis をブローカー/結果バックエンドとして利用し、FastAPI アプリへ統合する具体的なステップを示します。
Celery インスタンスの定義
celery_app.py に Celery 本体を切り出し、ブローカーと結果バックエンドに同一 Redis を指定します。設定はシンプルに保ちつつ、タスクが所属するキュー名も明示しておきます。
|
1 2 3 4 5 6 7 8 9 10 11 12 |
# celery_app.py from celery import Celery celery = Celery( "worker", broker="redis://redis:6379/0", # Docker Compose のサービス名で解決 backend="redis://redis:6379/1" ) # タスクが自動的に fastapi キューへ振り分けられるよう設定 celery.conf.task_routes = {"app.tasks.*": {"queue": "fastapi"}} |
タスク定義と FastAPI からの呼び出し
タスクは純粋な関数として tasks.py に記述します。bind=True を付けることでリトライロジックをタスク内部で利用できます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# tasks.py from .celery_app import celery import time @celery.task(bind=True, max_retries=3, default_retry_delay=60) def generate_report(self, user_id: int): """重いレポート生成(例:PDF 作成)をシミュレート""" try: time.sleep(5) # 実際は重い処理に置き換える return {"status": "completed", "user_id": user_id} except Exception as exc: # 失敗時は自動リトライ raise self.retry(exc=exc) |
FastAPI 側からは .apply_async() または .delay() を呼び出すだけです。
|
1 2 3 4 5 6 7 8 9 10 11 |
# main.py from fastapi import FastAPI from .tasks import generate_report app = FastAPI() @app.post("/report/{user_id}") async def request_report(user_id: int): task = generate_report.apply_async(args=[user_id]) return {"task_id": task.id} |
Docker Compose でのローカル環境構築例
以下の docker-compose.yml は FastAPI、Celery ワーカー、Redis の3サービスを定義し、1 コマンドで開発環境を立ち上げられます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
version: "3.9" services: api: build: . command: uvicorn main:app --host 0.0.0.0 --port 8000 volumes: - .:/code ports: - "8000:8000" depends_on: - redis - worker worker: build: . command: celery -A celery_app.celery worker --loglevel=info volumes: - .:/code depends_on: - redis redis: image: redis:7-alpine ports: - "6379:6379" |
本番環境でのワーカー管理
本番では systemd や Kubernetes の Deployment によりワーカープロセスを常駐させ、障害時に自動再起動させます。Celery ワーカーはメモリリークや例外で停止することがあるため、監視と再起動の仕組みは必須です。
|
1 2 3 4 5 |
# start.sh(ローカルデバッグ用) #!/usr/bin/env bash docker-compose up -d --build echo "FastAPI: http://localhost:8000" |
代替キュー (RQ・Dramatiq) の実装例と結果取得パターン
Celery が過剰になるケースでは、軽量な RQ や高速な Dramatiq が有効です。ここでは最小構成のコード例と、タスク完了をクライアントに通知する 3 種類の手法を示します。
Redis Queue (RQ) のシンプル実装
まずはキューとワーカーをセットアップし、FastAPI からジョブを投入する流れです。
|
1 2 3 4 5 6 7 8 9 10 11 |
# rq_worker.py import redis from rq import Queue, Worker redis_conn = redis.Redis(host="redis", port=6379) q = Queue("fastapi", connection=redis_conn) def send_email(to: str, subject: str, body: str): # 実際のメール送信ロジック(SMTP など)に置き換える print(f"Sent email to {to}") |
ワーカー起動例(Dockerfile に記載しても可):
|
1 2 |
$ rq worker fastapi --url redis://localhost:6379/0 |
FastAPI 側でジョブをエンキューします。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# main.py の続き from rq import Queue from redis import Redis from .rq_worker import send_email redis_conn = Redis(host="redis", port=6379) q = Queue("fastapi", connection=redis_conn) @app.post("/email") def enqueue_email(to: str, subject: str, body: str): job = q.enqueue(send_email, to, subject, body) return {"job_id": job.get_id()} |
Dramatiq の高速実装
Dramatiq はシリアライズコストが低く、マイクロ秒単位のレイテンシを実現します。
|
1 2 3 4 5 6 7 8 9 10 11 12 |
# dramatiq_worker.py import dramatiq from dramatiq.brokers.redis import RedisBroker redis_broker = RedisBroker(host="redis", port=6379) dramatiq.set_broker(redis_broker) @dramatiq.actor(max_retries=5, time_limit=300_000) def resize_image(image_path: str): # Pillow 等で画像リサイズ処理 print(f"Resized {image_path}") |
FastAPI 側エンドポイント:
|
1 2 3 4 5 |
@app.post("/resize") def request_resize(path: str): job = resize_image.send(path) return {"message_id": job.message_id} |
タスク結果取得パターンの比較
結果をクライアントに返す方法は用途に応じて選択できます。以下の表で特徴を整理しました。
| 手法 | 実装概要 | メリット | デメリット |
|---|---|---|---|
| Polling | /status/{id} を定期的に GET してタスク状態を取得 |
実装が最も簡単 | 不要なリクエストが増える |
| WebSocket | サーバ側で WebSocket 接続に完了情報をプッシュ |
リアルタイム性が高い | 接続管理や認証が必要 |
| StreamingResponse (SSE) | text/event-stream を返し、サーバ側でイベント送信 |
HTTP だけで完結、ブラウザ対応広範囲 | クライアント側のハンドリングが若干複雑 |
Polling のコード例
|
1 2 3 4 5 |
@app.get("/status/{job_id}") def job_status(job_id: str): job = q.fetch_job(job_id) # RQ の場合 return {"status": job.get_status()} |
WebSocket で結果プッシュ(Dramatiq + FastAPI)
まずは結果を Redis に保存するユーティリティを用意します。
|
1 2 3 4 5 6 7 8 |
# utils.py import json, redis redis_conn = redis.Redis(host="redis", port=6379) def store_result(job_id: str, payload: dict): redis_conn.set(f"result:{job_id}", json.dumps(payload)) |
タスク側で store_result を呼び出し、WebSocket ハンドラは次のように実装します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# websockets.py from fastapi import APIRouter, WebSocket import asyncio, json from .utils import redis_conn router = APIRouter() @router.websocket("/ws/{job_id}") async def ws_status(websocket: WebSocket, job_id: str): await websocket.accept() while True: raw = redis_conn.get(f"result:{job_id}") if raw: await websocket.send_text(raw.decode()) break await asyncio.sleep(1) |
本番環境でのデプロイ・運用ベストプラクティス
非同期タスク基盤を本番で安定稼働させるためには、リソース設計・スケーリング・エラーハンドリング・監視の4つの観点を体系的に整備する必要があります。
ワーカープロセス数とリソース設計
CPU コア数とタスク特性(I/O バウンドか CPU バウンドか)に応じて --concurrency を決定します。Celery の公式ガイドでは「CPUコア × 2」を目安としていますが、I/O 待ちが多い場合はさらに増やすことでスループット向上が期待できます【Celery Docs】。
Kubernetes でのデプロイ例(リソース制限付き):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
apiVersion: apps/v1 kind: Deployment metadata: name: celery-worker spec: replicas: 3 template: spec: containers: - name: worker image: yourrepo/fastapi-task:latest args: ["celery", "-A", "celery_app.celery", "worker", "--concurrency=8"] resources: requests: cpu: "500m" memory: "256Mi" limits: cpu: "1" memory: "512Mi" |
Docker / Kubernetes でのスケーリング設定例
- Docker Compose:
docker-compose up --scale worker=4とすればワーカーを横に増やせます。 - Kubernetes HPA: CPU 使用率が一定以上になったら自動でレプリカ数を増減させる設定です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: celery-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 |
エラーハンドリング・リトライ戦略
-
タスクレベル
@celery.task(bind=True, max_retries=5, default_retry_delay=60)のようにデコレータでリトライポリシーを明示します。 -
キュー全体の DLQ(Dead Letter Queue)
task_acks_late=Trueとworker_prefetch_multiplier=1を設定すると、失敗したタスクが自動的に再試行されず DLQ に送られます。DLQ 用の別キューを用意して手動で調査できるようにします。 -
例外通知
Sentry SDK を組み込み、sentry_sdk.capture_exception(e)で障害情報を即座に取得できます。
|
1 2 3 |
import sentry_sdk sentry_sdk.init(dsn="YOUR_SENTRY_DSN", traces_sample_rate=0.2) |
監視ツールの導入例
| ツール | 主な役割 | 導入ポイント |
|---|---|---|
| Flower | ワーカー状態・タスク統計の可視化 | docker run -p 5555:5555 mher/flower --broker=redis://redis:6379/0 |
| Prometheus Exporter | キュー長、処理時間などメトリクス収集 | Celery 用 prometheus_client ライブラリをタスク開始・終了時にインクリメント |
| Sentry | 例外・パフォーマンスのリアルタイム通知 | DSN を環境変数で注入し、sentry_sdk.init() を呼び出すだけ |
これらを組み合わせれば、タスクが滞留したときやワーカーが落ちたときに即座にアラートが届く仕組みが完成します。
まとめ
- FastAPI の async / await は I/O 待ちを最小化し、高い同時リクエスト処理能力を実現します。
- 標準の
BackgroundTasksは軽微なバックグラウンド作業に適していますが、スケールが必要なら外部キューへ委譲すべきです。 - Celery・RQ・Dramatiq の特徴と選定基準を比較し、プロジェクト要件(ブローカー、タスク複雑性、開発リソース)に合わせて最適なツールを選択してください。
- Celery を例に、ブローカー設定・タスク定義・Docker Compose によるローカル環境構築手順を具体的に示しました。
- RQ と Dramatiq の最小実装と、Polling / WebSocket / SSE の 3 種類の結果取得パターンを提示し、ユースケース別に選択肢を提供します。
- 本番デプロイでは ワーカープロセス数の算出、K8s HPA による自動スケーリング、DLQ とリトライ戦略、Flower / Prometheus / Sentry での監視体制構築が不可欠です。
これらのベストプラクティスを踏まえて非同期タスク基盤を設計すれば、ユーザー体験とシステムスケーラビリティの両立が実現できます。
参考サンプルコードは MIT ライセンスの公式リポジトリ https://github.com/tiangolo/full-stack-fastapi-postgresql を活用してください。必要に応じてクローンし、プロジェクト固有の設定へ置き換えるだけで即座に動作します。