RabbitMQ是一个开源的消息队列系统,实现了高级消息队列协议(AMQP)。它提供了强大的消息传递功能,支持多种消息传递模式,是分布式系统中常用的消息中间件。
RabbitMQ核心概念
消息中间件
消息中间件是分布式系统中重要的组件,用于:
- 解耦:生产者和消费者之间不需要直接通信
- 异步:提高系统的响应性和吞吐量
- 削峰:缓冲瞬时高并发请求
- 可靠:确保消息不丢失
基本概念
- Producer(生产者):发送消息的应用程序
- Consumer(消费者):接收消息的应用程序
- Queue(队列):存储消息的缓冲区
- Exchange(交换机):接收生产者发送的消息,根据路由规则将消息转发到队列
- Binding(绑定):交换机和队列之间的连接关系
- Routing Key(路由键):消息发送时指定的路由信息
- Connection(连接):应用程序与RabbitMQ服务器之间的TCP连接
- Channel(通道):建立在连接之上的虚拟连接
工作模式
1. 简单模式(Simple)
一个生产者对应一个消费者,最简单的消息传递模式。
生产者代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuesimpleQueue(){returnnewQueue("simple.queue",true);}}@ServicepublicclassSimpleProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("simple.queue",message);System.out.println("发送消息:"+message);}}消费者代码:
@Component@RabbitListener(queues="simple.queue")publicclassSimpleConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("接收消息:"+message);// 处理消息逻辑}}2. 工作队列模式(Work Queue)
一个生产者对应多个消费者,消费者竞争消费消息,实现负载均衡。
生产者代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicQueueworkQueue(){returnnewQueue("work.queue",true);}}@ServicepublicclassWorkProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("work.queue",message);System.out.println("发送消息:"+message);}}消费者代码:
@ComponentpublicclassWorkConsumer1{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);try{// 模拟处理时间Thread.sleep(1000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者1处理完成:"+message);}}@ComponentpublicclassWorkConsumer2{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);try{// 模拟处理时间Thread.sleep(2000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者2处理完成:"+message);}}3. 发布订阅模式(Publish/Subscribe)
一个生产者发送消息,通过交换机广播给多个队列,多个消费者分别处理。
配置代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange");}@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1",true);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2",true);}@BeanpublicBindingbinding1(FanoutExchangefanoutExchange,QueuefanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingbinding2(FanoutExchangefanoutExchange,QueuefanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}生产者代码:
@ServicepublicclassFanoutProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("fanout.exchange","",message);System.out.println("发布消息:"+message);}}消费者代码:
@Component@RabbitListener(queues="fanout.queue1")publicclassFanoutConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);}}@Component@RabbitListener(queues="fanout.queue2")publicclassFanoutConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);}}4. 路由模式(Routing)
根据路由键将消息发送到指定的队列。
配置代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange");}@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1",true);}@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2",true);}@BeanpublicBindingbindingDirect1(DirectExchangedirectExchange,QueuedirectQueue1){returnBindingBuilder.bind(directQueue1).to(directExchange).with("info");}@BeanpublicBindingbindingDirect2(DirectExchangedirectExchange,QueuedirectQueue2){returnBindingBuilder.bind(directQueue2).to(directExchange).with("error");}}生产者代码:
@ServicepublicclassDirectProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendInfoMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","info","INFO: "+message);System.out.println("发送INFO消息:"+message);}publicvoidsendErrorMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","error","ERROR: "+message);System.out.println("发送ERROR消息:"+message);}}消费者代码:
@Component@RabbitListener(queues="direct.queue1")publicclassDirectConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("INFO消费者接收消息:"+message);}}@Component@RabbitListener(queues="direct.queue2")publicclassDirectConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("ERROR消费者接收消息:"+message);}}5. 主题模式(Topic)
根据通配符匹配的路由键将消息发送到相应的队列。
配置代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic.exchange");}@BeanpublicQueuetopicQueue1(){returnnewQueue("topic.queue1",true);}@BeanpublicQueuetopicQueue2(){returnnewQueue("topic.queue2",true);}@BeanpublicBindingbindingTopic1(TopicExchangetopicExchange,QueuetopicQueue1){returnBindingBuilder.bind(topicQueue1).to(topicExchange).with("user.*");}@BeanpublicBindingbindingTopic2(TopicExchangetopicExchange,QueuetopicQueue2){returnBindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#");}}生产者代码:
@ServicepublicclassTopicProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendUserCreateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.create","创建用户:"+message);System.out.println("发送用户创建消息:"+message);}publicvoidsendUserUpdateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.update","更新用户:"+message);System.out.println("发送用户更新消息:"+message);}publicvoidsendUserDeleteMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.delete","删除用户:"+message);System.out.println("发送用户删除消息:"+message);}}消费者代码:
@Component@RabbitListener(queues="topic.queue1")publicclassTopicConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者1接收消息:"+message);}}@Component@RabbitListener(queues="topic.queue2")publicclassTopicConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者2接收消息:"+message);}}高级特性
消息确认机制
生产者确认:
@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);// 启用发布者确认rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{if(ack){System.out.println("消息发送成功:"+correlationData.getId());}else{System.out.println("消息发送失败:"+cause);}});// 启用发布者返回rabbitTemplate.setReturnsCallback(returned->{System.out.println("消息无法路由:"+returned.getMessage());});returnrabbitTemplate;}}消费者确认:
@Component@RabbitListener(queues="confirm.queue")publicclassConfirmConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage,Channelchannel,Messagemsg)throwsIOException{try{System.out.println("接收消息:"+message);// 处理消息逻辑// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){// 处理失败,拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);}}}死信队列
处理无法被正常消费的消息。
配置代码:
@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuenormalQueue(){Map<String,Object>args=newHashMap<>();args.put("x-dead-letter-exchange","dead.letter.exchange");args.put("x-dead-letter-routing-key","dead.letter");args.put("x-message-ttl",60000);// 消息TTL 60秒returnnewQueue("normal.queue",true,false,false,args);}@BeanpublicQueuedeadLetterQueue(){returnnewQueue("dead.letter.queue",true);}@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("dead.letter.exchange");}@BeanpublicBindingdeadLetterBinding(DirectExchangedeadLetterExchange,QueuedeadLetterQueue){returnBindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead.letter");}}死信消费者:
@Component@RabbitListener(queues="dead.letter.queue")publicclassDeadLetterConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("处理死信消息:"+message);// 处理死信消息的逻辑}}消息持久化
@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuedurableQueue(){// durable=true 表示队列持久化returnnewQueue("durable.queue",true);}@BeanpublicExchangedurableExchange(){// durable=true 表示交换机持久化returnnewDirectExchange("durable.exchange",true,false);}}@ServicepublicclassDurableProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){// MessageProperties 设置消息持久化MessagePropertiesproperties=newMessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Messagemsg=newMessage(message.getBytes(),properties);rabbitTemplate.send("durable.exchange","durable.key",msg);}}监控和管理
管理界面功能
RabbitMQ管理界面提供了丰富的监控和管理功能:
- Overview:查看整体状态和统计信息
- Connections:查看连接信息
- Channels:查看通道信息
- Exchanges:管理交换机
- Queues:管理队列
- Admin:用户和权限管理
常用监控命令
# 查看队列信息 rabbitmqctl list_queues name messages consumers # 查看交换机信息 rabbitmqctl list_exchanges # 查看绑定关系 rabbitmqctl list_bindings # 查看连接信息 rabbitmqctl list_connectionsSpring Boot Actuator集成
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>management:endpoints:web:exposure:include:health,info,rabbitendpoint:rabbit:enabled:true性能优化
1. 连接复用
@ConfigurationpublicclassRabbitConfig{@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin123");// 设置连接缓存connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);connectionFactory.setConnectionCacheSize(10);// 设置通道缓存connectionFactory.setChannelCacheSize(25);returnconnectionFactory;}}2. 批量发送
@ServicepublicclassBatchProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendBatchMessages(List<String>messages){List<Message>messageList=messages.stream().map(msg->MessageBuilder.withBody(msg.getBytes()).build()).collect(Collectors.toList());// 批量发送消息for(Messagemessage:messageList){rabbitTemplate.send("batch.exchange","batch.routing.key",message);}}}3. 消费者优化
@ConfigurationpublicclassRabbitConfig{@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置消费者并发数factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);// 设置预取数量factory.setPrefetchCount(5);// 设置手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);returnfactory;}}故障排查
常见问题
- 消息丢失
- 检查队列和消息是否持久化
- 确认生产者和消费者确认机制是否正确配置
- 消息重复消费
- 确保消费者处理逻辑的幂等性
- 使用消息ID进行去重
- 连接断开
- 检查网络连接稳定性
- 配置连接重试机制
- 性能问题
- 监控队列长度和消费者数量
- 调整预取数量和并发消费者数
日志配置
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>logging:level:com.rabbitmq:DEBUGorg.springframework.amqp:DEBUG