When building microservices, you often run into challenges designing inter-service communication. With synchronous REST API calls, if the receiving service goes down, the sending side is affected as well. This is exactly where asynchronous messaging with Kafka shines.

This article walks you through implementing a Kafka Producer and Consumer in a Spring Boot 3.x application from scratch using spring-kafka. We’ll cover everything in one go — from the basics of KafkaTemplate and @KafkaListener, to error handling, retries, and Dead Letter Topic (DLT) configuration. For guidance on when to use @Async (in-process thread-based async) versus Kafka, see the Async Processing Guide.

Local Environment: Starting Kafka with Docker Compose

You can keep it minimal with a KRaft setup (no Zookeeper required).

# docker-compose.yml
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

Run docker compose up -d and you’re ready to go. For details on writing Dockerfiles and multi-stage builds, see the Docker Containerization Guide.

Adding the Dependency and Configuring application.yml

// build.gradle
dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

This is managed by Spring Boot’s BOM, so no version specification is needed.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-app-group
      # earliest reads from the beginning of the partition when the consumer group is not yet registered.
      # In production, latest (start from newest) is more common.
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Starting with StringSerializer / StringDeserializer is the safest approach. Switch to JsonSerializer when you need to handle JSON objects.

Sending Messages with KafkaTemplate

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(String orderId, String payload) {
        kafkaTemplate.send("orders", orderId, payload)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Send failed orderId={}", orderId, ex);
                } else {
                    log.info("Send succeeded offset={}",
                        result.getRecordMetadata().offset());
                }
            });
    }
}

send() returns a CompletableFuture. Register a callback with whenComplete to log both successful sends and failures.

Receiving Messages with @KafkaListener

@Component
@Slf4j
public class OrderConsumer {

    // When you only need the payload
    @KafkaListener(topics = "orders", groupId = "my-app-group")
    public void consume(String message) {
        log.info("Received: {}", message);
    }

    // When you need metadata (offset, partition)
    @KafkaListener(topics = "orders-detail", groupId = "my-app-group")
    public void consumeWithMeta(ConsumerRecord<String, String> record) {
        log.info("key={}, partition={}, offset={}",
            record.key(), record.partition(), record.offset());
    }
}

If you only need to process the payload, receiving it directly as a parameter is the most readable approach. Use ConsumerRecord when you need metadata such as offset, partition, or timestamp.

Verifying the Setup

Invoke the Producer through a REST API endpoint to verify everything works.

curl -X POST http://localhost:8080/orders \
  -H "Content-Type: application/json" \
  -d '{"orderId":"001","item":"coffee"}'

If the message appears in the Consumer’s logs, you’re good. For automated testing, @EmbeddedKafka is very convenient. You must specify spring.embedded.kafka.brokers via @TestPropertySource — without it, the application will try to connect to localhost:9092 from application.yml and fail.

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders"})
@TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class OrderConsumerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @SpyBean
    private OrderConsumer orderConsumer;

    @Test
    void shouldReceiveMessage() {
        kafkaTemplate.send("orders", "key1", "test-message");
        verify(orderConsumer, timeout(10000)).consume(any());
    }
}

Using @SpyBean with verify() lets you confirm message receipt without introducing state management (like CountDownLatch) into the Consumer class itself.

Configuring Error Handling and Retries with DefaultErrorHandler

With the default configuration, an exception during processing triggers infinite retries, causing the consumer to stall. Set up DefaultErrorHandler explicitly.

import org.springframework.kafka.support.serializer.DeserializationException;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
            kafkaListenerContainerFactory(
                ConsumerFactory<String, String> consumerFactory,
                KafkaTemplate<String, String> kafkaTemplate) {

        // 1-second interval, max 3 retries (4 total attempts)
        var backOff = new FixedBackOff(1000L, 3L);
        var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        var errorHandler = new DefaultErrorHandler(recoverer, backOff);

        // Use DeserializationException from spring-kafka, not the Jackson class of the same name
        errorHandler.addNotRetryableExceptions(DeserializationException.class);

        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

To use exponential backoff, pass ExponentialBackOff instead of FixedBackOff.

var backOff = new ExponentialBackOff(1000L, 2.0); // 1s → 2s → 4s...
backOff.setMaxAttempts(4);

For a comparison with exception design in REST APIs, see the Exception Handling Guide.

Monitoring the Dead Letter Topic (DLT)

Messages that exceed the retry limit are automatically forwarded to {original-topic-name}.DLT. For the orders topic, that’s orders.DLT.

@KafkaListener(topics = "orders.DLT", groupId = "my-app-dlt-group")
public void consumeDlt(ConsumerRecord<String, String> record) {
    log.error("DLT received - manual intervention required key={}, value={}",
        record.key(), record.value());
    // Send alert, record to management DB, etc.
}

Decide at design time what to do with messages that accumulate in the DLT — whether to fix the root cause and re-queue them to the original topic, or handle them manually.

Things to Keep in Mind for Production

SSL/SASL authentication is configured via spring.kafka.security.protocol and is required for connecting to production brokers. If the number of instances in a group exceeds the partition count, the surplus instances will have no partitions assigned, so be mindful of the relationship between partition count and consumer count when designing for throughput. For operations, monitoring consumer lag with kafka-consumer-groups.sh or Prometheus is standard practice.

Summary

With spring-kafka, integrating Spring Boot and Kafka is straightforward.

  • Implement the Producer with KafkaTemplate.send()
  • Implement the Consumer with @KafkaListener
  • Configure retries and DLT forwarding with DefaultErrorHandler + DeadLetterPublishingRecoverer

These three pillars give you a solid foundation for asynchronous inter-service communication. For large-scale data processing combining Kafka with Spring Batch, check out the Spring Batch Guide.