SpringBoot整合RabbitMQ
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.3</version>
</dependency>
2、配置rabbitmq
通过RabbitProperties.properties配置文件配置链接
rabbitmq:
host: 192.168.56.10
virtual-host: /
port: 5672
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
重点:
#开启发送端确认机制,发送到Broke的确认
publisher-confirm-type: correlated
#开启时消息发送到队列的消息确认机制
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个Returnconfirm
template:
mandatory: true
3、开启Rabbit
启动类添加@EnableRabbit注解
@EnableRabbit
@SpringBootApplication
public class FamilyBookingApplication {
public static void main(String[] args) {
SpringApplication.run(FamilyBookingApplication.class, args);
}
}
4、Rabbit使用
现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
5、使用测试
1、创建Exchange、Queue、Binding
使用AmqpAdmin创建。
通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。
public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
@Autowired
AmqpAdmin amqpAdmin;
/**
* 创建Exchange交换机
*/
@Test
public void craeteChage() {
/**
* public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
*/
DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false);
amqpAdmin.declareExchange(directExchange);
}
/**
* 创建Queue交换机
*/
@Test
public void craetQueue() {
/**
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
*/
Queue queue = new Queue("Family-Hello-Queue", true,false, false);
amqpAdmin.declareQueue(queue);
}
/**
* 创建Bind交换机
*/
@Test
public void craetBind() {
/**
* public Binding(String destination, 绑定目标
* Binding.DestinationType destinationType, 绑定类型
* String exchange, 交换机
* String routingKey, router-key
* @Nullable Map<String, Object> arguments 参数) {
*/
Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null);
amqpAdmin.declareBinding(binding);
}
上面为DirectExchange类型交换机的创建,其它类型Exchange类似。
更多API可参考AmqpAdmin的各方法。
2、收发消息
发送消息
/**
* rabbitTemplate 发送消息
*/
@Test
public String sendMessage(){
//发送字符串
rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString()));
//发送对象
MemberEntity memberEntity = new MemberEntity();
memberEntity.setAddress("rabbitMQ测试-会员地址信息");
memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com");
//发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString()));
return "向RabbitMQ发送消息成功";
}
3、接受消息
RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。
/**
* 监听队列是各数组,可以同时监听多个队列
* 通过返回的消息类型知道是一个org.springframework.amqp.core.Message;
* @param message
*/
@RabbitListener(queues={"Family-Hello-Queue"})
public String sendMessage(){
//发送字符串
// rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString()));
//发送对象
MemberEntity memberEntity = new MemberEntity();
memberEntity.setAddress("rabbitMQ测试-会员地址信息");
memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com");
//发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString()));
return "向RabbitMQ发送消息成功";
}
/**
*
* 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T>
* 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。
* 还可以直接获取到链接中传输数据的通道channel
* @param message
*/
@RabbitListener(queues={"Family-Hello-Queue"})
public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){
//消息体,数据内容
//通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
byte[] body = message.getBody();
//消息头信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass());
System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity);
}
Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
注意:
1)、同一个消息,只能有一个客户端收到
2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息
3)、@RabbitHandler也可以接受队列消息
@RabbitListener可以用在类和方法上
@RabbitHandler只能用在方法上。
@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。
比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个
public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)
因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。
简单代码如下:
@RabbitListener(queues={"Family-Hello-Queue"})
@Service("memberService")
public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService {
//接受队列中指定的People类型数据
@RabbitHandler
public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){
//消息体,数据内容
//通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
byte[] body = message.getBody();
//消息头信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity);
}
//接受队列中指定的User类型数据
@RabbitHandler
public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){
//消息体,数据内容
//通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
byte[] body = message.getBody();
//消息头信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity);
}
}
6、RabbitMQ Java配置Json序列化
通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。
@Bean
@ConditionalOnMissingBean
public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer();
configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());
configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));
configurer.setRabbitProperties(properties);
return configurer;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean({RabbitOperations.class})
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。
RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:
public MessageConverter getMessageConverter() {
return this.messageConverter;
}
private MessageConverter messageConverter = new SimpleMessageConverter();
然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}
if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
如果是序列化的就是使用的SerializationUtils.serialize(object);
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。
因此RabbitMQ自定义配置如下:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description: RabbitMQ配置
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/3/12 17:07
* @version: 1.0
*/
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
再次发送对象时,从mq里面获取的对象就是json格式了。
7、开启消息回调确认机制
#开启发送端确认机制,发送到Broke的确认
publisher-confirm-type: correlated
#开启时消息发送到队列的消息确认机制
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个ReturnCallback
template:
mandatory: true
#手动确认ack
listener:
simple:
acknowledge-mode: manual
8、重点:
发送端:
package com.yanxizhu.family.users.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @description: RabbitMQ配置 序列化
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/17 16:44
* @version: 1.0
*/
@Configuration
public class RabbitConfig {
private RabbitTemplate rabbitTemplate;
/**
* rabbitTemplate循环依赖,解决方法:手动初始化模板方法
* @param connectionFactory
* @return
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制rabbitMaTemplate
* 一、发送端消息确认机制,发送给Broke的消息确认回调
* 1、开始消息确认 publisher-confirm-type: correlated
* 2、设置确认回调 setConfirmCallback()
* 二、发送端消息确认机制,发送给队列的消息确认回调
* 1、开始消息确认
* publisher-returns: true
* template:
* mandatory: true
* 2、设置确认回调 setReturnsCallback
* 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
* 1、开启消费端消息确认
* listener:
* simple:
* acknowledge-mode: manual
* 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。
* 默认的消息确认(默认ACK)问题:
* 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失;
* 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。
* 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。
* 4、手动ack如何签收
* //消息头信息
* MessageProperties messageProperties = message.getMessageProperties();
* long deliveryTag = messageProperties.getDeliveryTag();
* channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收
* 5、消费退回
*
* //参数:long var1, boolean var3(是否批量), boolean var4
* channel.basicNack(deliveryTag,true,true); //可以批量回退
*
* //参数:long var1, boolean var3
* channel.basicReject(deliveryTag,true); //单个回退
*
* 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。
*/
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵达服务器
* @param correlationData 当前消息关联的唯一数据(这个消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。
System.out.println("====确认收到消息了。。。。================");
System.out.println(correlationData+"|"+ack+"|"+cause);
}
});
//注意:必须要有这一步,不然不生效
rabbitTemplate.setMandatory(true);
//设置消息抵达队列回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 当时这个消息发给哪个交换机
* @param routingKey 当时这个消息用的哪个路由键
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("==========投递到队列失败了。。。==========");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
}
}
发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。
说明:
* 一、发送端消息确认机制,发送给Broke的消息确认回调
* 1、开始消息确认 publisher-confirm-type: correlated
* 2、设置确认回调 setConfirmCallback()
* 二、发送端消息确认机制,发送给队列的消息确认回调
* 1、开始消息确认
* publisher-returns: true
* template:
* mandatory: true
* 2、设置确认回调 setReturnsCallback
* 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
* 1、开启消费端消息确认
* listener:
* simple:
* acknowledge-mode: manual
* 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。
* 默认的消息确认(默认ACK)问题:
* 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失;
* 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。
* 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。
消费端手动ack
/**
*
* 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T>
* 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。
* 还可以直接获取到链接中传输数据的通道channel
* @param message
*/
@RabbitHandler
public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException {
//消息体,数据内容
//通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
byte[] body = message.getBody();
//消息头信息
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
//手动确认签收
channel.basicAck(deliveryTag,true);
System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass());
System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity);
//回退
//可以批量回退
// channel.basicNack(deliveryTag,true,true);
//单个回退
//true重新排队,false丢弃
// channel.basicReject(deliveryTag,true);
}
说明:
手动ack如何签收
//消息头信息
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收
消费退回
//参数:long var1, boolean var3(是否批量), boolean var4
channel.basicNack(deliveryTag,true,true); //可以批量回退
//参数:long var1, boolean var3
channel.basicReject(deliveryTag,true); //单个回退
第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。
最后rabbitMQ配置:
spring:
rabbitmq:
host: 192.168.56.10
virtual-host: /
port: 5672
#开启发送端确认
publisher-confirm-type: correlated
#开启发送端消息抵达队列的确认
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个retureconfirm
template:
mandatory: true
#开启手动确认
listener:
simple:
acknowledge-mode: manual
SpringBoot中使用延时队列
1、Queue、Exchange、Binding可以@Bean进去
2、监听消息的方法可以有三种参数(不分数量,顺序)
Object content, Message message, Channel channel
3、channel可以用来拒绝消息,否则自动ack;
代码实现
发送消息到MQ:
public String sendDelayMessage(){
//发送对象
MemberEntity memberEntity = new MemberEntity();
memberEntity.setId(11111111L);
memberEntity.setAddress("死信队列测试");
memberEntity.setEmail("延迟队列,异步处理会员信息");
memberEntity.setCreateTime(new Date());
//发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString()));
return "向RabbitMQ发送消息成功";
}
通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。
package com.yanxizhu.family.users.config;
import com.rabbitmq.client.Channel;
import com.yanxizhu.family.users.entity.MemberEntity;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @description: 延迟队列
* 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。
* 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。
* 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。
* @author: <a href="mailto:batis@foxmail.com">清风</a>
* @date: 2022/4/17 20:47
* @version: 1.0
*/
@Configuration
public class DelayQueueConfig {
/**
* 监听最终的队列
* @param entity
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues="member.release.queue")
public void listener(MemberEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//@bean自动创建交换机、路由、绑定
//交换机
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("member-event-exchange", true, false);
}
/**
* 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况)
* @return
*/
//死信队列
@Bean
public Queue memberDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机
arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键
arguments.put("x-message-ttl", 60000);//过期时间
//public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Queue queue = new Queue("member.delay.queue",true,false,false,arguments);
return queue;
}
//将死信队列和交换机绑定,路由键-users.create.member
@Bean
public Binding orderCreateOrderBingding(){
//目的地、绑定类型、交换机、路由键、参数
//public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments)
return new Binding("member.delay.queue",
Binding.DestinationType.QUEUE,
"member-event-exchange",
"users.create.member",
null);
}
//最后过期的队列
@Bean
public Queue orderReleaseQueue(){
Queue queue = new Queue("member.release.queue",true,false,false);
return queue;
}
//将最后的队列和交换机绑定,路由键-user.release.member
@Bean
public Binding orderReleaseOrderBingding(){
return new Binding("member.release.queue",
Binding.DestinationType.QUEUE,
"member-event-exchange",
"users.release.member",
null);
}
}
评论 (0)