マイクロサービスを構築していると、サービス間の連携設計で悩むことがありますよね。REST APIによる同期通信だと、受信側がダウンしたときに送信側まで影響を受けてしまいます。そういう場面でKafkaを使った非同期メッセージングが効いてきます。
この記事では spring-kafka を使って、KafkaのProducerとConsumerを Spring Boot 3.x アプリにゼロから実装する手順を解説します。KafkaTemplate と @KafkaListener の基本から、エラーハンドリング・リトライ・Dead Letter Topic(DLT)の設定まで一本道で押さえます。なお、@Async(プロセス内スレッド非同期)との使い分けは非同期処理ガイドを参照してください。
ローカル環境:Docker ComposeでKafkaを起動する
ZookeeperなしのKRaft構成で最短に書けます。
# docker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
docker compose up -d で起動したら準備完了です。Dockerfileの書き方やマルチステージビルドについてはDockerコンテナ化ガイドも参照してください。
依存追加とapplication.ymlの設定
// build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
Spring Boot管理のBOMに含まれているのでバージョン指定は不要です。
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-app-group
# earliest はConsumerグループ未登録時にパーティション先頭から読む。本番では latest(最新から開始)が一般的
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
まずは StringSerializer / StringDeserializer で始めるのが無難です。JSONオブジェクトを扱いたいときは JsonSerializer に切り替えましょう。
KafkaTemplateでメッセージを送信する
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(String orderId, String payload) {
kafkaTemplate.send("orders", orderId, payload)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("送信失敗 orderId={}", orderId, ex);
} else {
log.info("送信成功 offset={}",
result.getRecordMetadata().offset());
}
});
}
}
send() は CompletableFuture を返します。whenComplete でコールバックを登録して、送信成功・失敗をログに残しておきましょう。
@KafkaListenerでメッセージを受信する
@Component
@Slf4j
public class OrderConsumer {
// ペイロードだけ必要な場合
@KafkaListener(topics = "orders", groupId = "my-app-group")
public void consume(String message) {
log.info("受信: {}", message);
}
// メタデータ(offset・partition)が必要な場合
@KafkaListener(topics = "orders-detail", groupId = "my-app-group")
public void consumeWithMeta(ConsumerRecord<String, String> record) {
log.info("key={}, partition={}, offset={}",
record.key(), record.partition(), record.offset());
}
}
ペイロードだけ処理するなら引数直接受け取りが読みやすいです。オフセット・パーティション・タイムスタンプなどのメタデータが必要なら ConsumerRecord を使いましょう。
動作確認
REST APIエンドポイント経由でProducerを呼び出して確認します。
curl -X POST http://localhost:8080/orders \
-H "Content-Type: application/json" \
-d '{"orderId":"001","item":"coffee"}'
Consumerのログにメッセージが出力されれば成功です。自動テストには @EmbeddedKafka が便利です。@TestPropertySource で spring.embedded.kafka.brokers を指定するのが必須で、これがないと application.yml の localhost:9092 を参照して接続エラーになります。
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders"})
@TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class OrderConsumerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@SpyBean
private OrderConsumer orderConsumer;
@Test
void メッセージを受信できる() {
kafkaTemplate.send("orders", "key1", "test-message");
verify(orderConsumer, timeout(10000)).consume(any());
}
}
@SpyBean と verify() を使うことで、CountDownLatch のような状態管理をConsumerクラス自体に持ち込まずに受信確認できます。
DefaultErrorHandlerでエラーハンドリングとリトライを設定する
デフォルト設定のままだと処理例外が発生したときに無限リトライが走り、コンシューマーが詰まります。DefaultErrorHandler を明示的に設定しましょう。
import org.springframework.kafka.support.serializer.DeserializationException;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
// 1秒間隔・最大3回リトライ(計4回試行)
var backOff = new FixedBackOff(1000L, 3L);
var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
var errorHandler = new DefaultErrorHandler(recoverer, backOff);
// Jacksonの同名クラスではなくspring-kafka由来のDeserializationExceptionを使うこと
errorHandler.addNotRetryableExceptions(DeserializationException.class);
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
指数バックオフを使いたい場合は FixedBackOff の代わりに ExponentialBackOff を渡します。
var backOff = new ExponentialBackOff(1000L, 2.0); // 1秒→2秒→4秒...
backOff.setMaxAttempts(4);
REST APIでの例外設計との比較は例外ハンドリングガイドも参考にしてみてください。
Dead Letter Topic(DLT)の監視
リトライ上限を超えたメッセージは自動的に {元のトピック名}.DLT へ転送されます。orders トピックなら orders.DLT です。
@KafkaListener(topics = "orders.DLT", groupId = "my-app-dlt-group")
public void consumeDlt(ConsumerRecord<String, String> record) {
log.error("DLT受信 - 手動対応が必要 key={}, value={}",
record.key(), record.value());
// アラート送信・管理DBへの記録など
}
DLTに蓄積されたメッセージは、障害原因を修正してオリジナルトピックへ再キューするか、手動処理するかを設計段階で決めておきましょう。
本番適用時に意識しておきたいこと
SSL/SASL認証は spring.kafka.security.protocol で設定します。本番ブローカーへの接続には必須です。グループ内のインスタンス数がパーティション数を超えると余剰インスタンスは処理を担当しないため、スループット設計の際はパーティション数とコンシューマー数の関係を意識してください。運用面では kafka-consumer-groups.sh やPrometheusでコンシューマーラグを監視するのが基本です。
まとめ
spring-kafkaを使えばSpring BootとKafkaの統合はスムーズに進みます。
KafkaTemplate.send()でProducer実装@KafkaListenerでConsumer実装DefaultErrorHandler+DeadLetterPublishingRecovererでリトライとDLT転送
この3点を押さえれば、サービス間の非同期連携の基盤が整います。KafkaとSpring Batchを組み合わせた大量データ処理についてはSpring Batchガイドも参考にしてみてください。