マイクロサービス間の非同期通信にRabbitMQを使いたいとき、最初に悩むのがExchange・Queue・Bindingの関係ですよね。Kafkaとは異なるルーティングモデルなので、概念を掴んでからコードを書くとスムーズです。この記事ではspring-amqpを使い、動くコードを中心に説明します。

RabbitMQとAMQPの基本概念

RabbitMQはAMQPプロトコルを実装したメッセージブローカーです。メッセージの流れはこうなります。

Producer → Exchange → Binding(RoutingKey) → Queue → Consumer

ProducerはExchangeにメッセージを投げるだけで、どのQueueに届くかはExchangeのタイプとBindingが決めます。

  • Direct :RoutingKeyが完全一致するQueueに届ける
  • Fanout :RoutingKeyを無視してすべてのQueueに届ける
  • Topic*(1ワード)と#(0ワード以上)のワイルドカードマッチ

Docker ComposeでRabbitMQを起動する

管理UIつきの rabbitmq:3-management イメージが便利です。

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

docker compose up -d で起動後、http://localhost:15672 の管理UIでExchange・Queue・Bindingの状態を確認できます(初期ログイン: guest/guest)。

依存追加と接続設定

pom.xml に追加します。spring-retryspring-boot-starter-amqp の推移的依存として含まれるため、別途追加は不要です。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties の設定はシンプルです。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

AutoConfigurationで ConnectionFactoryRabbitTemplate が自動生成されます。

Exchange・Queue・BindingをBean定義する

@Configuration クラスでまとめて定義するのが定石です。

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE    = "order.direct";
    public static final String QUEUE       = "order.queue";
    public static final String ROUTING_KEY = "order.created";

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(EXCHANGE);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(QUEUE).build();
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue)
                .to(orderExchange).with(ROUTING_KEY);
    }

    @Bean
    public Jackson2JsonMessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                          Jackson2JsonMessageConverter converter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(converter);
        return template;
    }
}

Jackson2JsonMessageConverter を設定しておくとPOJOをJSONで送受信できます。

Producer実装

@Service
@RequiredArgsConstructor
public class OrderProducer {

    private final RabbitTemplate rabbitTemplate;

    public void send(OrderMessage message) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE,
            RabbitMQConfig.ROUTING_KEY,
            message
        );
    }
}

Consumer実装(手動Ack)

@Component
public class OrderConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE, ackMode = "MANUAL")
    public void consume(OrderMessage message,
                        com.rabbitmq.client.Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            process(message);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, false); // requeue=false → DLQへ転送
        }
    }
}

basicNack の第3引数(requeue)を false にするとDLQへ転送されます。

注意: 後述するコンテナレベルのリトライ(Advice経由)と手動Ackは原則として併用しません。コンテナレベルリトライを使う場合は ackMode をデフォルト(AUTO)のままにしてください。

Fanout・Topic Exchangeの実装例

Fanout Exchange は通知配信など全Consumerにブロードキャストしたいときに使います。

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("notification.fanout");
}

@Bean
public Binding emailBinding(Queue emailQueue, FanoutExchange fanout) {
    return BindingBuilder.bind(emailQueue).to(fanout);
}

Topic Exchange はワイルドカードルーティングが必要なときに有効です。例えば log.error.# というパターンは log.error.app.db のように後続ワードが何個あってもマッチします。

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("log.topic");
}

@Bean
public Binding errorBinding(Queue errorQueue, TopicExchange topic) {
    return BindingBuilder.bind(errorQueue).to(topic).with("log.error.#");
}
ExchangeRoutingKey主なユースケース
Direct完全一致タスクキュー
Fanout無視ブロードキャスト通知
Topicワイルドカードログ集約・イベントフィルタ

デッドレターExchange(DLX)の設定

処理に失敗したメッセージをDLQに退避させる仕組みです。以下のように orderQueue() の定義を更新してください(先ほどの定義を置き換えます)。

// orderQueue() をDLX引数付きに置き換え
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable(RabbitMQConfig.QUEUE)
            .withArgument("x-dead-letter-exchange", "order.dlx")
            .build();
}

@Bean
public DirectExchange dlxExchange() {
    return new DirectExchange("order.dlx");
}

@Bean
public Queue dlqQueue() {
    return QueueBuilder.durable("order.dlq").build();
}

@Bean
public Binding dlqBinding(Queue dlqQueue, DirectExchange dlxExchange) {
    return BindingBuilder.bind(dlqQueue).to(dlxExchange).with(RabbitMQConfig.ROUTING_KEY);
}

Consumer側で basicNack(tag, false, false) を呼ぶとこのDLQにメッセージが転送されます。

リトライポリシーの設定

コンテナファクトリに RetryInterceptorBuilder でAdviceを設定します。最大3回リトライ後に失敗するとDLQへ転送されます。

注意: このリトライはコンテナレベル(ackMode = AUTO)で動作します。手動Ackのリスナーとは原則として併用しないでください。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
    SimpleRabbitListenerContainerFactory factory =
            new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(converter);
    factory.setAdviceChain(
        RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(1000L, 2.0, 10000L)
            .build()
    );
    return factory;
}

外部APIを呼び出す側の耐障害性にはResilience4jのサーキットブレーカーも組み合わせられます。詳しくはSpring BootでResilience4jのサーキットブレーカーを実装する方法を参照してください。

KafkaとRabbitMQの使い分け

観点KafkaRabbitMQ
スループット非常に高い中程度
ルーティングConsumer GroupのみExchange/Bindingで柔軟
順序保証Partition内Queue単位
メッセージ保持設定期間保持Ack後に削除

大量ログの収集やイベントソーシングは Kafka 、タスクキューや複雑なルーティングが必要な通知配信は RabbitMQ という使い分けが一般的です。Kafkaの実装についてはSpring BootでKafkaのProducer・Consumerを実装する方法も参照してください。

同一JVM内のサービス間疎結合には ApplicationEvent も選択肢の一つです。Spring BootのApplicationEventで疎結合な非同期処理を実装する方法も合わせて読んでみてください。

まとめ

spring-amqpでのRabbitMQ実装の要点をまとめました。Exchange・Queue・Binding をBean定義するパターンを覚えると、Direct/Fanout/Topicの切り替えもスムーズです。DLX+リトライを組み合わせることで障害時のメッセージロストも防げるので、本番投入前にぜひ設定しておきましょう。

非同期処理全般についてはSpring Bootの非同期処理(@Async)を使いこなすも参考にしてください。