news 2026/4/3 5:14:04

15、RabbitMQ

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
15、RabbitMQ

RabbitMQ是一个开源的消息队列系统,实现了高级消息队列协议(AMQP)。它提供了强大的消息传递功能,支持多种消息传递模式,是分布式系统中常用的消息中间件。


RabbitMQ核心概念

消息中间件

消息中间件是分布式系统中重要的组件,用于:

  • 解耦:生产者和消费者之间不需要直接通信
  • 异步:提高系统的响应性和吞吐量
  • 削峰:缓冲瞬时高并发请求
  • 可靠:确保消息不丢失

基本概念

  1. Producer(生产者):发送消息的应用程序
  2. Consumer(消费者):接收消息的应用程序
  3. Queue(队列):存储消息的缓冲区
  4. Exchange(交换机):接收生产者发送的消息,根据路由规则将消息转发到队列
  5. Binding(绑定):交换机和队列之间的连接关系
  6. Routing Key(路由键):消息发送时指定的路由信息
  7. Connection(连接):应用程序与RabbitMQ服务器之间的TCP连接
  8. Channel(通道):建立在连接之上的虚拟连接

工作模式

1. 简单模式(Simple)

一个生产者对应一个消费者,最简单的消息传递模式。

生产者代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuesimpleQueue(){returnnewQueue("simple.queue",true);}}@ServicepublicclassSimpleProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("simple.queue",message);System.out.println("发送消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="simple.queue")publicclassSimpleConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("接收消息:"+message);// 处理消息逻辑}}

2. 工作队列模式(Work Queue)

一个生产者对应多个消费者,消费者竞争消费消息,实现负载均衡。

生产者代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueueworkQueue(){returnnewQueue("work.queue",true);}}@ServicepublicclassWorkProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("work.queue",message);System.out.println("发送消息:"+message);}}

消费者代码:

@ComponentpublicclassWorkConsumer1{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);try{// 模拟处理时间Thread.sleep(1000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者1处理完成:"+message);}}@ComponentpublicclassWorkConsumer2{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);try{// 模拟处理时间Thread.sleep(2000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者2处理完成:"+message);}}

3. 发布订阅模式(Publish/Subscribe)

一个生产者发送消息,通过交换机广播给多个队列,多个消费者分别处理。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange");}@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1",true);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2",true);}@BeanpublicBindingbinding1(FanoutExchangefanoutExchange,QueuefanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingbinding2(FanoutExchangefanoutExchange,QueuefanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

生产者代码:

@ServicepublicclassFanoutProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("fanout.exchange","",message);System.out.println("发布消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="fanout.queue1")publicclassFanoutConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);}}@Component@RabbitListener(queues="fanout.queue2")publicclassFanoutConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);}}

4. 路由模式(Routing)

根据路由键将消息发送到指定的队列。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange");}@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1",true);}@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2",true);}@BeanpublicBindingbindingDirect1(DirectExchangedirectExchange,QueuedirectQueue1){returnBindingBuilder.bind(directQueue1).to(directExchange).with("info");}@BeanpublicBindingbindingDirect2(DirectExchangedirectExchange,QueuedirectQueue2){returnBindingBuilder.bind(directQueue2).to(directExchange).with("error");}}

生产者代码:

@ServicepublicclassDirectProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendInfoMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","info","INFO: "+message);System.out.println("发送INFO消息:"+message);}publicvoidsendErrorMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","error","ERROR: "+message);System.out.println("发送ERROR消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="direct.queue1")publicclassDirectConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("INFO消费者接收消息:"+message);}}@Component@RabbitListener(queues="direct.queue2")publicclassDirectConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("ERROR消费者接收消息:"+message);}}

5. 主题模式(Topic)

根据通配符匹配的路由键将消息发送到相应的队列。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic.exchange");}@BeanpublicQueuetopicQueue1(){returnnewQueue("topic.queue1",true);}@BeanpublicQueuetopicQueue2(){returnnewQueue("topic.queue2",true);}@BeanpublicBindingbindingTopic1(TopicExchangetopicExchange,QueuetopicQueue1){returnBindingBuilder.bind(topicQueue1).to(topicExchange).with("user.*");}@BeanpublicBindingbindingTopic2(TopicExchangetopicExchange,QueuetopicQueue2){returnBindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#");}}

生产者代码:

@ServicepublicclassTopicProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendUserCreateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.create","创建用户:"+message);System.out.println("发送用户创建消息:"+message);}publicvoidsendUserUpdateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.update","更新用户:"+message);System.out.println("发送用户更新消息:"+message);}publicvoidsendUserDeleteMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.delete","删除用户:"+message);System.out.println("发送用户删除消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="topic.queue1")publicclassTopicConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者1接收消息:"+message);}}@Component@RabbitListener(queues="topic.queue2")publicclassTopicConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者2接收消息:"+message);}}

高级特性

消息确认机制

生产者确认:

@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);// 启用发布者确认rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{if(ack){System.out.println("消息发送成功:"+correlationData.getId());}else{System.out.println("消息发送失败:"+cause);}});// 启用发布者返回rabbitTemplate.setReturnsCallback(returned->{System.out.println("消息无法路由:"+returned.getMessage());});returnrabbitTemplate;}}

消费者确认:

@Component@RabbitListener(queues="confirm.queue")publicclassConfirmConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage,Channelchannel,Messagemsg)throwsIOException{try{System.out.println("接收消息:"+message);// 处理消息逻辑// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){// 处理失败,拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);}}}

死信队列

处理无法被正常消费的消息。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuenormalQueue(){Map<String,Object>args=newHashMap<>();args.put("x-dead-letter-exchange","dead.letter.exchange");args.put("x-dead-letter-routing-key","dead.letter");args.put("x-message-ttl",60000);// 消息TTL 60秒returnnewQueue("normal.queue",true,false,false,args);}@BeanpublicQueuedeadLetterQueue(){returnnewQueue("dead.letter.queue",true);}@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("dead.letter.exchange");}@BeanpublicBindingdeadLetterBinding(DirectExchangedeadLetterExchange,QueuedeadLetterQueue){returnBindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead.letter");}}

死信消费者:

@Component@RabbitListener(queues="dead.letter.queue")publicclassDeadLetterConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("处理死信消息:"+message);// 处理死信消息的逻辑}}

消息持久化

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuedurableQueue(){// durable=true 表示队列持久化returnnewQueue("durable.queue",true);}@BeanpublicExchangedurableExchange(){// durable=true 表示交换机持久化returnnewDirectExchange("durable.exchange",true,false);}}@ServicepublicclassDurableProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){// MessageProperties 设置消息持久化MessagePropertiesproperties=newMessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Messagemsg=newMessage(message.getBytes(),properties);rabbitTemplate.send("durable.exchange","durable.key",msg);}}

监控和管理

管理界面功能

RabbitMQ管理界面提供了丰富的监控和管理功能:

  1. Overview:查看整体状态和统计信息
  2. Connections:查看连接信息
  3. Channels:查看通道信息
  4. Exchanges:管理交换机
  5. Queues:管理队列
  6. Admin:用户和权限管理

常用监控命令

# 查看队列信息 rabbitmqctl list_queues name messages consumers # 查看交换机信息 rabbitmqctl list_exchanges # 查看绑定关系 rabbitmqctl list_bindings # 查看连接信息 rabbitmqctl list_connections

Spring Boot Actuator集成

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
management:endpoints:web:exposure:include:health,info,rabbitendpoint:rabbit:enabled:true

性能优化

1. 连接复用

@ConfigurationpublicclassRabbitConfig{@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin123");// 设置连接缓存connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);connectionFactory.setConnectionCacheSize(10);// 设置通道缓存connectionFactory.setChannelCacheSize(25);returnconnectionFactory;}}

2. 批量发送

@ServicepublicclassBatchProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendBatchMessages(List<String>messages){List<Message>messageList=messages.stream().map(msg->MessageBuilder.withBody(msg.getBytes()).build()).collect(Collectors.toList());// 批量发送消息for(Messagemessage:messageList){rabbitTemplate.send("batch.exchange","batch.routing.key",message);}}}

3. 消费者优化

@ConfigurationpublicclassRabbitConfig{@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置消费者并发数factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);// 设置预取数量factory.setPrefetchCount(5);// 设置手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);returnfactory;}}

故障排查

常见问题

  1. 消息丢失
    • 检查队列和消息是否持久化
    • 确认生产者和消费者确认机制是否正确配置
  2. 消息重复消费
    • 确保消费者处理逻辑的幂等性
    • 使用消息ID进行去重
  3. 连接断开
    • 检查网络连接稳定性
    • 配置连接重试机制
  4. 性能问题
    • 监控队列长度和消费者数量
    • 调整预取数量和并发消费者数

日志配置

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
logging:level:com.rabbitmq:DEBUGorg.springframework.amqp:DEBUG
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!