マイクロサービス間の非同期通信にRabbitMQを使いたいとき、最初に悩むのがExchange・Queue・Bindingの関係ですよね。Kafkaとは異なるルーティングモデルなので、概念を掴んでからコードを書くとスムーズです。この記事ではspring-amqpを使い、動くコードを中心に説明します。
RabbitMQとAMQPの基本概念
RabbitMQはAMQPプロトコルを実装したメッセージブローカーです。メッセージの流れはこうなります。
Producer → Exchange → Binding(RoutingKey) → Queue → Consumer
ProducerはExchangeにメッセージを投げるだけで、どのQueueに届くかはExchangeのタイプとBindingが決めます。
- Direct :RoutingKeyが完全一致するQueueに届ける
- Fanout :RoutingKeyを無視してすべてのQueueに届ける
- Topic :
*(1ワード)と#(0ワード以上)のワイルドカードマッチ
Docker ComposeでRabbitMQを起動する
管理UIつきの rabbitmq:3-management イメージが便利です。
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
docker compose up -d で起動後、http://localhost:15672 の管理UIでExchange・Queue・Bindingの状態を確認できます(初期ログイン: guest/guest)。
依存追加と接続設定
pom.xml に追加します。spring-retry は spring-boot-starter-amqp の推移的依存として含まれるため、別途追加は不要です。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties の設定はシンプルです。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
AutoConfigurationで ConnectionFactory と RabbitTemplate が自動生成されます。
Exchange・Queue・BindingをBean定義する
@Configuration クラスでまとめて定義するのが定石です。
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE = "order.direct";
public static final String QUEUE = "order.queue";
public static final String ROUTING_KEY = "order.created";
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(EXCHANGE);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue)
.to(orderExchange).with(ROUTING_KEY);
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}
Jackson2JsonMessageConverter を設定しておくとPOJOをJSONで送受信できます。
Producer実装
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public void send(OrderMessage message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE,
RabbitMQConfig.ROUTING_KEY,
message
);
}
}
Consumer実装(手動Ack)
@Component
public class OrderConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE, ackMode = "MANUAL")
public void consume(OrderMessage message,
com.rabbitmq.client.Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
process(message);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, false); // requeue=false → DLQへ転送
}
}
}
basicNack の第3引数(requeue)を false にするとDLQへ転送されます。
注意: 後述するコンテナレベルのリトライ(Advice経由)と手動Ackは原則として併用しません。コンテナレベルリトライを使う場合は
ackModeをデフォルト(AUTO)のままにしてください。
Fanout・Topic Exchangeの実装例
Fanout Exchange は通知配信など全Consumerにブロードキャストしたいときに使います。
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("notification.fanout");
}
@Bean
public Binding emailBinding(Queue emailQueue, FanoutExchange fanout) {
return BindingBuilder.bind(emailQueue).to(fanout);
}
Topic Exchange はワイルドカードルーティングが必要なときに有効です。例えば log.error.# というパターンは log.error.app.db のように後続ワードが何個あってもマッチします。
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("log.topic");
}
@Bean
public Binding errorBinding(Queue errorQueue, TopicExchange topic) {
return BindingBuilder.bind(errorQueue).to(topic).with("log.error.#");
}
| Exchange | RoutingKey | 主なユースケース |
|---|---|---|
| Direct | 完全一致 | タスクキュー |
| Fanout | 無視 | ブロードキャスト通知 |
| Topic | ワイルドカード | ログ集約・イベントフィルタ |
デッドレターExchange(DLX)の設定
処理に失敗したメッセージをDLQに退避させる仕組みです。以下のように orderQueue() の定義を更新してください(先ほどの定義を置き換えます)。
// orderQueue() をDLX引数付きに置き換え
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(RabbitMQConfig.QUEUE)
.withArgument("x-dead-letter-exchange", "order.dlx")
.build();
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("order.dlx");
}
@Bean
public Queue dlqQueue() {
return QueueBuilder.durable("order.dlq").build();
}
@Bean
public Binding dlqBinding(Queue dlqQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlqQueue).to(dlxExchange).with(RabbitMQConfig.ROUTING_KEY);
}
Consumer側で basicNack(tag, false, false) を呼ぶとこのDLQにメッセージが転送されます。
リトライポリシーの設定
コンテナファクトリに RetryInterceptorBuilder でAdviceを設定します。最大3回リトライ後に失敗するとDLQへ転送されます。
注意: このリトライはコンテナレベル(
ackMode = AUTO)で動作します。手動Ackのリスナーとは原則として併用しないでください。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
factory.setAdviceChain(
RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000L, 2.0, 10000L)
.build()
);
return factory;
}
外部APIを呼び出す側の耐障害性にはResilience4jのサーキットブレーカーも組み合わせられます。詳しくはSpring BootでResilience4jのサーキットブレーカーを実装する方法を参照してください。
KafkaとRabbitMQの使い分け
| 観点 | Kafka | RabbitMQ |
|---|---|---|
| スループット | 非常に高い | 中程度 |
| ルーティング | Consumer Groupのみ | Exchange/Bindingで柔軟 |
| 順序保証 | Partition内 | Queue単位 |
| メッセージ保持 | 設定期間保持 | Ack後に削除 |
大量ログの収集やイベントソーシングは Kafka 、タスクキューや複雑なルーティングが必要な通知配信は RabbitMQ という使い分けが一般的です。Kafkaの実装についてはSpring BootでKafkaのProducer・Consumerを実装する方法も参照してください。
同一JVM内のサービス間疎結合には ApplicationEvent も選択肢の一つです。Spring BootのApplicationEventで疎結合な非同期処理を実装する方法も合わせて読んでみてください。
まとめ
spring-amqpでのRabbitMQ実装の要点をまとめました。Exchange・Queue・Binding をBean定義するパターンを覚えると、Direct/Fanout/Topicの切り替えもスムーズです。DLX+リトライを組み合わせることで障害時のメッセージロストも防げるので、本番投入前にぜひ設定しておきましょう。
非同期処理全般についてはSpring Bootの非同期処理(@Async)を使いこなすも参考にしてください。