Contents
前提条件と環境準備
ローカルで Kafka を動作させるには Java Runtime が必須 です。Kafka は JDK 11 以降を公式にサポートしており、JDK 8 系では一部 API の非互換が原因でエラーになるケースがあります。ここでは、主要 OS(macOS・Ubuntu)ごとのインストール手順とバージョン確認コマンドを紹介します。
Java のインストール確認
以下のコマンドでインストール済みの Java バージョンを確認できます。11 以上が表示されれば次へ進めます。
|
1 2 3 |
java -version # 出力例: openjdk version "11.0.22" 2024-07-18 |
OS 別インストール手順
| OS | インストール方法 | 主なコマンド |
|---|---|---|
| macOS | Homebrew を利用 | brew install openjdk@11sudo ln -sfn $(brew --prefix)/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk |
| Ubuntu (20.04 以降) | APT パッケージ | sudo apt update && sudo apt install openjdk-11-jdk |
インストール後は再度 java -version を実行し、バージョンが 11.x 系であることを確認してください。
Kafka と Zookeeper の取得・ディレクトリ構成
公式サイトから直接ダウンロードすれば、改ざんリスクのない 最新安定版(執筆時点)を入手できます。バージョンは常に公式ページで確認し、以下のコマンド例では 3.x 系の最新版を想定しています。
バイナリのダウンロードと検証
- 公式ダウンロード URL(最新バージョンは
<VERSION>に置き換えてください)
bash
VERSION=$(curl -s https://downloads.apache.org/kafka/ | grep -Eo 'kafka_[0-9]+\.[0-9]+-[0-9]+' | tail -1)
curl -O https://downloads.apache.org/kafka/${VERSION}/kafka_2.13-${VERSION}.tgz - SHA‑512 ハッシュで整合性を確認(公式ページに掲載のハッシュと比較)
bash
sha512sum kafka_2.13-${VERSION}.tgz
# → 公式サイトの値と一致すれば OK
標準的なディレクトリ構造
ダウンロードしたアーカイブを解凍し、/usr/local/kafka に配置します。典型的なディレクトリ構成は次の通りです。
| ディレクトリ | 内容 |
|---|---|
config/ |
Zookeeper とブローカー用設定ファイル(zookeeper.properties, server.properties) |
bin/ |
起動スクリプト・CLI ツール(kafka-server-start.sh など) |
logs/ |
デフォルトのログ出力先(log.dirs のデフォルトでも使用) |
この構成をそのまま利用すれば、後続の設定変更が最小限で済みます。
シングルノードでの起動と基本設定
ローカル開発では Zookeeper と Kafka ブローカー を同一マシン上でバックグラウンド実行させるだけで十分です。以下では起動手順と、最低限調整すべき設定項目を解説します。
Zookeeper の起動手順
zookeeper.properties はデフォルトのままで問題ありませんが、ログ保存先は明示的に指定しておくとトラブル時に確認しやすくなります。
|
1 2 3 4 5 |
cd /usr/local/kafka bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 起動確認(プロセスが残っていれば成功) ps -ef | grep zookeeper |
Kafka ブローカーの起動手順
ブローカーは server.properties の数点だけ変更すれば、シングルノードでも即座に利用可能です。
|
1 2 3 |
bin/kafka-server-start.sh -daemon config/server.properties # 起動ログは logs/ ディレクトリに出力されます |
主要設定項目
config/server.properties(ローカル向け推奨値)
|
1 2 3 4 5 6 7 8 |
broker.id=0 # シングルブローカーなので固定 listeners=PLAINTEXT://localhost:9092 log.dirs=/usr/local/kafka/logs # ディスク容量は十分に確保 num.network.threads=3 # 小規模環境ではデフォルトで可 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 |
config/zookeeper.properties(変更不要が望ましい)
|
1 2 3 4 |
dataDir=/usr/local/kafka/zookeeper clientPort=2181 maxClientCnxns=60 |
これらの設定で Zookeeper → Kafka の順に起動すれば、ローカル環境は即座に稼働状態になります。
トピック作成とオプション
Kafka はトピック単位でデータを扱います。CLI ツール kafka-topics.sh を使えば、パーティション数やレプリケーション係数を自由に設定できますが、シングルブローカーの場合は 両方とも 1 が安全です。
トピック作成コマンド例
|
1 2 3 4 5 6 |
bin/kafka-topics.sh --create \ --topic my-topic \ --partitions 1 \ --replication-factor 1 \ --bootstrap-server localhost:9092 |
主なオプション解説
| オプション | 説明 |
|---|---|
--partitions |
パーティション数。スループット向上に寄与しますが、ローカル開発では 1 がシンプルです。 |
--replication-factor |
レプリカ数。ブローカーが 1 台だけの場合は必ず 1 を指定してください。 |
作成後は次のコマンドで一覧を確認できます。
|
1 2 |
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
プロデューサー・コンシューマーの確認
CLI と Java の両方で「メッセージが送信 → 受信」できることを確認すれば、Kafka が正しく稼働していると判断できます。
CLI での送受信テスト
- プロデューサー起動(標準入力から文字列を送信)
bash
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic - 別ターミナルでコンシューマー起動(先頭から全メッセージを取得)
bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --from-beginning
入力した文字列が即座にコンシューマ側に表示されれば成功です。
Java サンプルコードとビルド手順
以下は Maven を使わずにローカルの JAR だけで動かす最小構成です。kafka-clients-<version>.jar は公式サイトから取得してください(バージョンは server.properties と合わせる)。
Producer.java
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class Producer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "greeting", "Hello Kafka!"); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("Sent to partition %d offset %d%n", metadata.partition(), metadata.offset()); } else { exception.printStackTrace(); } }); producer.close(); } } |
Consumer.java
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "demo-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> rec : records) { System.out.printf("key=%s value=%s partition=%d offset=%d%n", rec.key(), rec.value(), rec.partition(), rec.offset()); } } } } |
コンパイル&実行手順
|
1 2 3 4 5 6 7 8 9 10 |
# kafka-clients.jar を取得し、同じディレクトリに置く wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/<VERSION>/kafka-clients-<VERSION>.jar javac -cp kafka-clients-*.jar Producer.java Consumer.java # コンシューマー起動(バックグラウンド推奨) java -cp .:kafka-clients-*.jar Consumer & # プロデューサー起動 java -cp .:kafka-clients-*.jar Producer |
コンソールに Sent to partition … と key=… value=… が表示されれば、Java クライアントからも正常にメッセージが流れています。
トラブルシューティング・Docker Compose 代替構築・本番移行ポイント
よくあるエラーと対処法
| エラー | 主な原因 | 解決策 |
|---|---|---|
| ポート 9092 が使用中 | 別プロセスがバインドしている | lsof -i :9092 で確認し、不要なら停止。もしくは listeners=PLAINTEXT://localhost:19092 に変更 |
Zookeeper 接続失敗 (Connection refused) |
起動していない・ポート不一致 | bin/zookeeper-server-start.sh -daemon config/zookeeper.properties を再実行し、nc -z localhost 2181 で接続確認 |
ブローカー起動直後に終了(broker.id 重複) |
同一マシン上で複数ブローカーを走らせている | server.properties の broker.id をユニークに設定。単体なら 0 固定 |
| ログディレクトリが書き込めない | ディスク容量不足または権限エラー | df -h で空き容量確認、必要なら別ディスクへ log.dirs を変更 |
Docker Compose による代替構築
Docker 環境ではホスト側に Java をインストールする必要がなく、イメージバージョンを揃えるだけで手軽に起動できます。公式イメージのタグは執筆時点で 7.5.0 ですが、実際に使用する際は 公式サイトの最新タグ と合わせてください。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# docker-compose.yml version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 # 必要に応じてバージョンを更新 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:7.5.0 # Kafka のバージョンは Zookeeper と合わせる depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - "9092:9092" |
起動手順
|
1 2 3 4 5 6 |
docker compose up -d # 背景で Zookeeper と Kafka を起動 docker compose exec kafka \ kafka-topics.sh --create \ --topic my-topic --partitions 1 --replication-factor 1 \ --bootstrap-server localhost:9092 |
※ バージョン注意
confluentinc/cp-kafkaとcp-zookeeperのタグは必ず同一にしてください。異なるバージョン間でプロトコル互換性が失われ、起動エラーになることがあります。
本番環境へ移行する際に検討すべき主要設定
| 項目 | 推奨設定例(小規模本番) | 理由 |
|---|---|---|
broker.id |
クラスタ全体で一意、0〜2,147,483,647 の整数 | ブローカー識別に必須 |
listeners / advertised.listeners |
PLAINTEXT と併せて SSL (SSL://host:9093) を設定し暗号化 |
データ漏洩防止 |
log.dirs |
SSD 推奨、容量は予測トラフィックの 2〜3 倍確保 | 高速書き込みとディスク不足回避 |
num.network.threads, num.io.threads |
CPU コア数 × 2 程度に増加(例: 8 cores → 16) | スループット向上 |
replication.factor |
3 以上で耐障害性確保 | ブローカー障害時のデータ喪失防止 |
min.insync.replicas |
replication.factor - 1(例: 2) |
書き込み成功判定を厳格化 |
| 認証・認可 | SASL/SCRAM または Kerberos の導入検討 | アクセス制御と監査ログ取得 |
本番環境では モニタリング(Prometheus + Grafana など)や バックアップ戦略、セキュリティパッチの適用サイクル も合わせて設計してください。公式ドキュメントの Production Checklist が詳細な指針を提供しています。
まとめ
- Java 11 以上 をインストールし、バージョンを確認する。
- 公式サイトから 最新安定版 Kafka バイナリ を取得し、SHA‑512 で整合性検証する。
- 標準ディレクトリ構成のまま Zookeeper → Kafka の順に起動し、必要最小限の設定 (
broker.id,listeners,log.dirs) を行う。 kafka-topics.shでトピックを作成し、CLI または Java クライアントでメッセージ送受信を確認する。- エラーが出たら よくあるトラブル テーブルを参照し、Docker Compose でも同様の環境を手軽に構築できることを覚えておく。
- 本番移行時は レプリケーション・暗号化・認証 などの追加設定とモニタリング基盤を整える。
この手順通りに進めれば、ローカルでの開発環境構築から本格的な運用設計まで、一貫した知識体系を身につけられます。ぜひ実際に手を動かして、Kafka の高速ストリーミング処理を体感してください。