Contents
Kafka コンシューマーの基本設定と選択基準
このセクションでは、コンシューマーグループを管理する group.id と、オフセット取得・コミットに関わる enable.auto.commit/auto.offset.reset の役割と推奨構成を説明します。正しい設定は スケールアウト時のパーティション分散 と 障害復旧時の再処理 を安全に行う鍵となります。
group.id の役割とベストプラクティス
group.id は同一グループ内でパーティションを分担させ、オフセットを共有するための識別子です。
- 同一
group.idを持つインスタンスは協調してメッセージを取得 し、各パーティションは必ず 1 インスタンスにだけ割り当てられます。 - グループが変わるとオフセットの共有が失われ、過去データの再読込や欠損が発生します。
|
1 2 3 4 |
# application.properties(Spring Kafka) spring.kafka.consumer.group-id=order-service-consumer spring.kafka.bootstrap-servers=broker1:9092,broker2:9092 |
| ケース | 推奨設定 |
|---|---|
| 単一サービスの水平スケール | 同一 group.id を使用し、インスタンス数に応じてトピック側でパーティション数を増やす |
| 複数マイクロサービスが同トピックを読む | 各サービスで 異なる group.id を設定してオフセット管理を独立させる |
ポイント:本番環境では機能単位(例: 注文処理、在庫更新)ごとに意味のある
group.idを付与し、デプロイ時に変更しないことが安全です。
enable.auto.commit と auto.offset.reset の選び方
自動コミットは手軽ですが、処理失敗時に未処理メッセージがスキップされるリスク があるため、用途に応じた設定が必要です。
enable.auto.commit=trueはデフォルトで 5 秒ごとに現在のオフセットをコミットします。バッチ処理や外部システム呼び出しが含まれる場合は 未完了レコードがコミットされる 可能性があります。auto.offset.resetはコンシューマーグループがトピックを初めて読むときの開始位置を決定します。
|
1 2 3 4 5 |
Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手動コミット推奨 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 初回は最古から読む KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); |
| 設定組み合わせ | 主な利用シーン |
|---|---|
enable.auto.commit=true + auto.offset.reset=latest |
高速ストリーム処理、失敗時に再処理が許容できるケース |
enable.auto.commit=false + auto.offset.reset=earliest |
バッチ・トランザクション的処理、正確なオフセット管理が必要な場合 |
ポイント:本番環境では 手動コミット (
enable.auto.commit=false) を基本とし、失敗時はアプリ側でリトライロジックを実装する方が安全です。
コミット戦略とポーリングループの最適化
ここでは poll() とオフセットコミットの組み合わせによるスループット・信頼性の調整方法を解説します。特に max.poll.interval.ms と poll のタイムアウト(consumer.poll(Duration) の引数)を混同しないよう注意してください。
手動コミット vs 自動コミット:トレードオフ
手動コミットは処理完了後に確実にオフセットを書き込めますが、コード量が増える点がデメリットです。自動コミットは設定だけで機能しますが、失敗時のロスが避けられません。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // poll timeout はメソッド引数 for (ConsumerRecord<String, String> record : records) { try { process(record); // ビジネスロジック } catch (Exception e) { handleFailure(record, e); // 例外は DLQ へ送るかリトライする } } // バッチ単位で同期コミット consumer.commitSync(); } |
| コミット方式 | メリット | デメリット |
|---|---|---|
commitSync() |
確実にオフセットが保存される | ポーリングサイクルが遅くなる |
commitAsync() |
高スループット | 失敗時のハンドリングが必要 |
ポイント:バッチ処理は
commitSync()、リアルタイムストリームはcommitAsync()+ 再試行ロジック を組み合わせると実務的です。
max.poll.records・fetch.min.bytes などのチューニング指針
ポーリング関連パラメータは レイテンシ vs スループット のバランスを取ります。max.poll.interval.ms は 次回 poll が呼び出されるまでの最大許容時間(デフォルト 5 分)であり、poll timeout とは別概念です。
max.poll.records: 1 回のpoll()で取得するレコード上限。大きくするとスループットは上がりますが、処理に要する時間が長くなりmax.poll.interval.msを超えるとリバランスが走ります。fetch.min.bytes/fetch.max.wait.ms: ブローカー側のデータ取得条件。小さすぎるとネットワーク往復が頻発し、大きすぎるとレイテンシが増します。- poll timeout は
consumer.poll(Duration.ofMillis(...))の引数で、ブローカーからデータを待つ最大時間です。短過ぎると空ポーリングが多くなり、長過ぎるとスレッドが不必要にブロックします。
|
1 2 3 4 5 6 7 |
# 1 回の poll で最大 500 件取得 → バッチ処理向き max.poll.records=500 # 少なくとも 64KB が揃うまで待機し、データが無い場合は最長 200ms 待つ fetch.min.bytes=65536 fetch.max.wait.ms=200 |
| パラメータ | 推奨範囲(目安) |
|---|---|
max.poll.records |
200〜1000(処理時間と CPU に合わせて調整) |
fetch.min.bytes |
32KB〜256KB |
fetch.max.wait.ms |
50ms〜500ms |
poll timeout |
500ms〜2s(Duration で指定) |
ポイント:まず 処理時間と
max.poll.recordsの関係を測定し、max.poll.interval.msが超えないように設定。次にネットワーク効率化のためfetch.min.bytes/fetch.max.wait.msを調整し、最後にpoll timeoutで空ポーリングを抑制します。
リバランスとエラーハンドリング、DLQ 戦略
リバランスや例外は本番障害の主因です。この章では安全なオフセット管理のための ConsumerRebalanceListener 実装例、Spring Kafka で推奨される DLQ 設定方法、そしてエラー種別に応じたハンドリングフローを示します。
ConsumerRebalanceListener の実装例とデータロス防止策
リバランスが発生した瞬間に 未コミットのオフセットを確実に書き込む ことで、再割り当て後に同一メッセージが再取得されるリスクを低減できます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class SafeRebalanceListener implements ConsumerRebalanceListener { private final KafkaConsumer<?, ?> consumer; // パーティションごとに次のオフセットを保持するマップ private final Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>(); public SafeRebalanceListener(KafkaConsumer<?, ?> consumer) { this.consumer = consumer; } /** アプリ側で処理完了後に呼び出す */ public void recordProcessed(TopicPartition tp, long offset) { // コミット対象は「次に読むべき」オフセットなので +1 pendingOffsets.put(tp, new OffsetAndMetadata(offset + 1)); } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // リバランス直前に未コミット分を同期的に保存 consumer.commitSync(pendingOffsets); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 必要ならここで初期化処理(例: 取得済みオフセットのクリア) pendingOffsets.keySet().removeAll(partitions); } } |
使用例:
|
1 2 3 4 5 6 7 |
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); SafeRebalanceListener listener = new SafeRebalanceListener(consumer); consumer.subscribe(List.of("orders"), listener); // メッセージ処理後に呼び出す listener.recordProcessed(record.topicPartition(), record.offset()); |
ポイント:
onPartitionsRevokedで 必ず同期コミット (commitSync) を行う実装を標準化すると、再起動時のデータロスや重複処理はほぼ防げます。
Spring Kafka におけるエラー処理と DLQ 設定
Spring Kafka では SeekToCurrentErrorHandler(または DeadLetterPublishingRecoverer)を組み合わせて、リトライ後に自動的に DLQ へ転送できます。dead-letter-topic は標準プロパティではなく、以下のように設定します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
spring: kafka: consumer: group-id: payment-service enable-auto-commit: false listener: ack-mode: manual_immediate # 手動コミット concurrency: 3 # 並列コンシューマ数 missing-topics-fatal: false # DLQ 用のトピック名を指定(標準プロパティは `dlq-topic`) dlq-topic: name: payment-dlq partitions: 1 replication-factor: 1 # エラーハンドラの設定例(Java Config が必要) |
Java Config の例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(Runtime.getRuntime().availableProcessors()); // 3 回リトライ → DLQ へ送信 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( template, (r, e) -> new TopicPartition("payment-dlq", r.partition())); SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(1000L, 3)); factory.setErrorHandler(errorHandler); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } |
リスナーメソッド
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@KafkaListener(topics = "payments", containerFactory = "kafkaListenerContainerFactory") public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) { try { processPayment(record); ack.acknowledge(); // 手動コミット } catch (RecoverableException e) { // リトライ対象例外はそのまま再スロー → エラーハンドラがリトライ実施 throw e; } catch (Exception e) { // 非回復可能例外は即 DLQ へ送られる throw new KafkaException("Send to DLQ", e); } } |
ポイント:
可逆的な例外はSeekToCurrentErrorHandlerがリトライし、リトライ上限に達したら自動で DLQ へ。
非回復可能例外は即座にKafkaExceptionをスローして DLQ に流す設計が安全です。
セキュリティ設定とモニタリング指標
暗号化・認証だけでなく、運用時の可視性を確保することが本番運用の必須条件です。この章では TLS + SCRAM の構成例と、主要メトリクスの監視ポイントを紹介します。
SSL/TLS と SASL/SCRAM の構成ベストプラクティス
データ保護は SASL_SSL を選択し、認証には SCRAM‑SHA‑256(または SHA‑512) を組み合わせるのが現在の推奨パターンです。
|
1 2 3 4 5 6 7 |
security.protocol=SASL_SSL ssl.truststore.location=/etc/kafka/truststore.jks ssl.truststore.password=${TRUSTSTORE_PWD} sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="consumer_user" password="${KAFKA_PASSWORD}"; |
| 項目 | 推奨設定 |
|---|---|
security.protocol |
SASL_SSL(暗号化+認証) |
ssl.truststore.location |
サーバー証明書を格納した JKS |
sasl.mechanism |
SCRAM-SHA-256 または SCRAM-SHA-512 |
| パスワード管理 | 環境変数、HashiCorp Vault など外部シークレットストアで注入 |
ポイント:TLS と SCRAM の組み合わせを全コンシューマーで統一し、証明書と認証情報は CI/CD パイプライン経由で安全に配布します。
主要メトリクスとアラート閾値例
Kafka クライアントが提供する JMX メトリクスを Prometheus 経由で取得し、以下の指標を中心に監視・アラートを設定します。
| 指標 | 意味 | 推奨閾値(例) |
|---|---|---|
kafka_consumer_lag |
グループごとの遅延レコード数 | 5 000 件超で Warning、20 000 件超で Critical |
kafka_consumer_poll_latency_avg (秒) |
poll() に要した平均時間 |
0.2 秒超で Warning |
kafka_consumer_heartbeat_rate (Hz) |
ハートビート送信レート | 設定 heartbeat.interval.ms=3000 の場合、0.25 Hz 未満で Warning |
Prometheus アラート例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# consumer lag が一定時間以上続くと警告 - alert: KafkaConsumerLagHigh expr: kafka_consumer_lag{group="order-service-consumer"} > 5000 for: 5m labels: severity: warning annotations: summary: "Consumer lag が高止まりしています" description: "{{ $labels.group }} の lag が {{ $value }} 件です。" # poll latency が上昇した場合のアラート - alert: KafkaPollLatencyHigh expr: kafka_consumer_poll_latency_avg{group="order-service-consumer"} > 0.2 for: 2m labels: severity: warning annotations: summary: "poll latency が高くなっています" description: "平均 poll 時間が {{ $value }} 秒です。" |
ポイント:メトリクスは ダッシュボードに可視化し、閾値超過時は自動スケーリングやコンテナ再起動のフックを検討 すると障害対応が迅速になります。
マルチスレッド実装とコンテナ環境別チューニング
高いスループットが必要な場合は並列消費が有効です。Spring Kafka のコンテナベース実装を利用すれば、スレッド安全性を保ちつつ簡単にマルチスレッド化できます。また、Kubernetes・Docker・AWS MSK それぞれの特性に合わせたリソース設定例も示します。
Spring Kafka の並列コンテナ設定
KafkaConsumer.run() を直接呼び出す設計は スレッド安全でない ため、代わりに ConcurrentMessageListenerContainer(@KafkaListener(concurrency = N))を利用します。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Configuration public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); // CPU コア数またはパーティション数に合わせて並列度を設定 int concurrency = Math.max(Runtime.getRuntime().availableProcessors(), 3); factory.setConcurrency(concurrency); // 手動即時コミットモード factory.getContainerProperties() .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } } |
リスナーメソッド例
|
1 2 3 4 5 6 7 8 9 10 11 12 |
@KafkaListener(topics = "events", groupId = "event-service", containerFactory = "kafkaListenerContainerFactory") public void handle(ConsumerRecord<String, String> record, Acknowledgment ack) { try { process(record); ack.acknowledge(); // 手動コミット } catch (Exception e) { sendToDlq(record, e); // DLQ へ転送(上章参照) } } |
| アプローチ | 特徴 |
|---|---|
KafkaConsumer.run() + 自前スレッド管理 |
高度な制御は可能だがバグリスク大 |
Spring の @KafkaListener(concurrency=N) |
安全・設定容易、リバランス自動対応 |
ポイント:本番では
concurrencyをパーティション数または CPU コア数に合わせて調整し、コンテナの水平スケールで更なるスループット向上を図ります。
Kubernetes / Docker / AWS MSK におけるリソースとパラメータ調整
コンテナ化環境では CPU・メモリ制限 と Kafka のポーリング設定が相互に影響します。過大な max.poll.records は OOM を招きやすく、逆に低すぎるとスループットが落ちます。
Kubernetes デプロイ例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
apiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer spec: replicas: 3 selector: matchLabels: app: kafka-consumer template: metadata: labels: app: kafka-consumer spec: containers: - name: consumer image: myrepo/kafka-consumer:2.5.0 resources: requests: cpu: "500m" memory: "512Mi" limits: cpu: "1" memory: "1024Mi" env: - name: KAFKA_MAX_POLL_RECORDS value: "400" - name: KAFKA_FETCH_MIN_BYTES value: "65536" - name: JAVA_OPTS value: "-Xms384m -Xmx768m" |
Docker(単体)実行例
|
1 2 3 4 5 6 7 |
docker run --rm \ --cpus="1.0" \ --memory="1024m" \ -e KAFKA_MAX_POLL_RECORDS=300 \ -e KAFKA_FETCH_MIN_BYTES=65536 \ myrepo/kafka-consumer:2.5.0 |
AWS MSK の留意点
- ブローカーと同一 AZ に配置 するとネットワークレイテンシが最小化されます。
broker.rackが設定されている場合は、コンシューマー Pod のaffinityを利用して Rack‑aware なスケジューリング を行うとリバランス頻度が減ります。
ポイント:リソース上限はポーリングパラメータに合わせてチューニング し、CPU が逼迫したら
max.poll.recordsを下げる、もしくは Pod の水平スケールで対処します。
まとめ
- 基本設定:
group.idは機能単位で固定し、enable.auto.commit=falseと用途に合わせたauto.offset.resetを選択。 - コミット戦略:手動コミットはバッチ安全策、
commitSyncとcommitAsyncの使い分けでスループットと確実性を調整。 - ポーリング最適化:
max.poll.records・fetch.min.bytes・fetch.max.wait.msとpoll timeout(Duration) を処理時間に合わせて設定し、max.poll.interval.ms超過を防止。 - リバランス & DLQ:
ConsumerRebalanceListenerで未コミットオフセットを確実に保存し、Spring Kafka のSeekToCurrentErrorHandlerとDeadLetterPublishingRecovererを組み合わせて例外種別ごとに適切に DLQ へ転送。 - セキュリティ・監視:TLS + SCRAM‑SHA‑256 が推奨構成、主要メトリクス(lag、poll latency、heartbeat)を Prometheus/Grafana で可視化し閾値アラートを設定。
- マルチスレッド & デプロイ:Spring の並列コンテナで安全にスレッド数を増やし、Kubernetes/Docker/AWS MSK 環境ではリソース制限と Kafka パラメータの整合性を保つ。
これらのベストプラクティスを踏まえて設定・運用すれば、本番環境でも 高可用性かつ低レイテンシ な Kafka コンシューマーが実現できます。