SpringBoot整合RabbitMQ

admin
2022-03-13 / 0 评论 / 615 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年04月17日,已超过1006天没有更新,若内容或图片失效,请留言反馈。

SpringBoot整合RabbitMQ

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.3</version>
</dependency>

2、配置rabbitmq

通过RabbitProperties.properties配置文件配置链接

  rabbitmq:
    host: 192.168.56.10
    virtual-host: /
    port: 5672
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

重点:

#开启发送端确认机制,发送到Broke的确认
publisher-confirm-type: correlated
#开启时消息发送到队列的消息确认机制
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个Returnconfirm
template:
  mandatory: true

3、开启Rabbit

启动类添加@EnableRabbit注解

@EnableRabbit
@SpringBootApplication
public class FamilyBookingApplication {

    public static void main(String[] args) {
        SpringApplication.run(FamilyBookingApplication.class, args);
    }

}

4、Rabbit使用

现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

5、使用测试

1、创建Exchange、Queue、Binding

使用AmqpAdmin创建。

通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。

public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) 
    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 创建Exchange交换机
     */
    @Test
    public void craeteChage() {
        /**
         * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
         */
        DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
    }

    /**
     * 创建Queue交换机
     */
    @Test
    public void craetQueue() {
        /**
         * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
         */
        Queue queue = new Queue("Family-Hello-Queue", true,false, false);
        amqpAdmin.declareQueue(queue);
    }

    /**
     * 创建Bind交换机
     */
    @Test
    public void craetBind() {
        /**
         * public Binding(String destination, 绑定目标
         * Binding.DestinationType destinationType, 绑定类型
         * String exchange, 交换机
         * String routingKey, router-key
         * @Nullable Map<String, Object> arguments  参数) {
         */
        Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null);
        amqpAdmin.declareBinding(binding);
    }

上面为DirectExchange类型交换机的创建,其它类型Exchange类似。

更多API可参考AmqpAdmin的各方法。

2、收发消息

发送消息

    /**
     * rabbitTemplate 发送消息
     */
    @Test
    public String sendMessage(){
        //发送字符串
        rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString()));

        //发送对象
        MemberEntity memberEntity = new MemberEntity();
        memberEntity.setAddress("rabbitMQ测试-会员地址信息");
        memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com");
        //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
        rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString()));

        return "向RabbitMQ发送消息成功";
    }

3、接受消息

RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。

    /**
     * 监听队列是各数组,可以同时监听多个队列
     * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message;
     * @param message
     */
    @RabbitListener(queues={"Family-Hello-Queue"})
    public String sendMessage(){
        //发送字符串
//        rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString()));

        //发送对象
        MemberEntity memberEntity = new MemberEntity();
        memberEntity.setAddress("rabbitMQ测试-会员地址信息");
        memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com");
        //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
        rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString()));

        return "向RabbitMQ发送消息成功";
    }

    /**
     *
     * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T>
     * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。
     * 还可以直接获取到链接中传输数据的通道channel
     * @param message
     */
    @RabbitListener(queues={"Family-Hello-Queue"})
    public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){
        //消息体,数据内容
        //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
        byte[] body = message.getBody();

        //消息头信息
        MessageProperties messageProperties = message.getMessageProperties();

        System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass());
        System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity);
    }

Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息

注意:

1)、同一个消息,只能有一个客户端收到

2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息

3)、@RabbitHandler也可以接受队列消息

@RabbitListener可以用在类和方法上

@RabbitHandler只能用在方法上。

@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。

比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个

public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)

因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。

简单代码如下:

@RabbitListener(queues={"Family-Hello-Queue"})
@Service("memberService")
public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService {

    //接受队列中指定的People类型数据
    @RabbitHandler
    public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){
        //消息体,数据内容
        //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
        byte[] body = message.getBody();

        //消息头信息
        MessageProperties messageProperties = message.getMessageProperties();

        System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity);
    }

   //接受队列中指定的User类型数据
    @RabbitHandler
    public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){
        //消息体,数据内容
        //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
        byte[] body = message.getBody();

        //消息头信息
        MessageProperties messageProperties = message.getMessageProperties();

        System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity);
    }

}

6、RabbitMQ Java配置Json序列化

通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。

@Bean
@ConditionalOnMissingBean
public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
            RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer();
            configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());
            configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));
            configurer.setRabbitProperties(properties);
            return configurer;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean({RabbitOperations.class})
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate();
            configurer.configure(template, connectionFactory);
            return template;
}

如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。

RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:

public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }
    
private MessageConverter messageConverter = new SimpleMessageConverter();

然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型

    protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        byte[] bytes = null;
        if (object instanceof byte[]) {
            bytes = (byte[])((byte[])object);
            messageProperties.setContentType("application/octet-stream");
        } else if (object instanceof String) {
            try {
                bytes = ((String)object).getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException var6) {
                throw new MessageConversionException("failed to convert to Message content", var6);
            }

            messageProperties.setContentType("text/plain");
            messageProperties.setContentEncoding(this.defaultCharset);
        } else if (object instanceof Serializable) {
            try {
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }

            messageProperties.setContentType("application/x-java-serialized-object");
        }

        if (bytes != null) {
            messageProperties.setContentLength((long)bytes.length);
            return new Message(bytes, messageProperties);
        } else {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
        }
    }

如果是序列化的就是使用的SerializationUtils.serialize(object);

else if (object instanceof Serializable) {
            try {
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }

            messageProperties.setContentType("application/x-java-serialized-object");

因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。

因此RabbitMQ自定义配置如下:

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description: RabbitMQ配置
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/12 17:07
 * @version: 1.0
 */
@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

再次发送对象时,从mq里面获取的对象就是json格式了。

7、开启消息回调确认机制

#开启发送端确认机制,发送到Broke的确认
publisher-confirm-type: correlated
#开启时消息发送到队列的消息确认机制
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个ReturnCallback
template:
  mandatory: true
#手动确认ack
listener:
    simple:
        acknowledge-mode: manual

8、重点:

发送端:

package com.yanxizhu.family.users.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


/**
 * @description: RabbitMQ配置 序列化
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/17 16:44
 * @version: 1.0
 */
@Configuration
public class RabbitConfig {

    private RabbitTemplate rabbitTemplate;

    /**
     * rabbitTemplate循环依赖,解决方法:手动初始化模板方法
     * @param connectionFactory
     * @return
     */
    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }


    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制rabbitMaTemplate
     * 一、发送端消息确认机制,发送给Broke的消息确认回调
     * 1、开始消息确认    publisher-confirm-type: correlated
     * 2、设置确认回调   setConfirmCallback()
     * 二、发送端消息确认机制,发送给队列的消息确认回调
     * 1、开始消息确认
     *     publisher-returns: true
     *     template:
     *       mandatory: true
     * 2、设置确认回调  setReturnsCallback
     * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
     *  1、开启消费端消息确认
     *      listener:
     *       simple:
     *         acknowledge-mode: manual
     *  2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。
     *    默认的消息确认(默认ACK)问题:
     *        我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失;
     *  3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。
     *  即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。
     *  4、手动ack如何签收
     *  //消息头信息
     *   MessageProperties messageProperties = message.getMessageProperties();
     *   long deliveryTag = messageProperties.getDeliveryTag();
     *   channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收
     *  5、消费退回
     *
     *         //参数:long var1, boolean var3(是否批量), boolean var4
     *         channel.basicNack(deliveryTag,true,true); //可以批量回退
     *
     *        //参数:long var1, boolean var3
     *         channel.basicReject(deliveryTag,true); //单个回退
     *
     *         第三个参数或第二个参数:false 丢弃    true:发回服务器,服务器重新入对。
     */
    public void initRabbitTemplate(){
        //设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达服务器
             * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。
                System.out.println("====确认收到消息了。。。。================");
                System.out.println(correlationData+"|"+ack+"|"+cause);
            }
        });


        //注意:必须要有这一步,不然不生效
        rabbitTemplate.setMandatory(true);
        //设置消息抵达队列回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message 投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给哪个交换机
             * @param routingKey 当时这个消息用的哪个路由键
             */
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("==========投递到队列失败了。。。==========");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
    }
}

发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。

说明:

 * 一、发送端消息确认机制,发送给Broke的消息确认回调
 * 1、开始消息确认    publisher-confirm-type: correlated
 * 2、设置确认回调   setConfirmCallback()
 * 二、发送端消息确认机制,发送给队列的消息确认回调
 * 1、开始消息确认
 *     publisher-returns: true
 *     template:
 *       mandatory: true
 * 2、设置确认回调  setReturnsCallback
 * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
 *  1、开启消费端消息确认
 *      listener:
 *       simple:
 *         acknowledge-mode: manual
 *  2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。
 *    默认的消息确认(默认ACK)问题:
 *        我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失;
 *  3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。
 *  即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。

消费端手动ack

    /**
     *
     * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T>
     * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。
     * 还可以直接获取到链接中传输数据的通道channel
     * @param message
     */
    @RabbitHandler
    public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException {
        //消息体,数据内容
        //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。
        byte[] body = message.getBody();

        //消息头信息
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();
        //手动确认签收
        channel.basicAck(deliveryTag,true);
        System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass());
        System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity);

        //回退
        //可以批量回退
//        channel.basicNack(deliveryTag,true,true);

        //单个回退
        //true重新排队,false丢弃
//        channel.basicReject(deliveryTag,true);
    }

说明:

手动ack如何签收

 //消息头信息
     MessageProperties messageProperties = message.getMessageProperties();
     long deliveryTag = messageProperties.getDeliveryTag();
     channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收

消费退回

 //参数:long var1, boolean var3(是否批量), boolean var4
  channel.basicNack(deliveryTag,true,true); //可以批量回退
 //参数:long var1, boolean var3
 channel.basicReject(deliveryTag,true); //单个回退
第三个参数或第二个参数:false 丢弃    true:发回服务器,服务器重新入对。

最后rabbitMQ配置:

spring:
  rabbitmq:
    host: 192.168.56.10
    virtual-host: /
    port: 5672
    #开启发送端确认
    publisher-confirm-type: correlated
    #开启发送端消息抵达队列的确认
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个retureconfirm
    template:
      mandatory: true
    #开启手动确认
    listener:
      simple:
        acknowledge-mode: manual

SpringBoot中使用延时队列

1、Queue、Exchange、Binding可以@Bean进去
2、监听消息的方法可以有三种参数(不分数量,顺序)
Object content, Message message, Channel channel
3、channel可以用来拒绝消息,否则自动ack;

代码实现

发送消息到MQ:

    public String sendDelayMessage(){
        //发送对象
        MemberEntity memberEntity = new MemberEntity();
        memberEntity.setId(11111111L);
        memberEntity.setAddress("死信队列测试");
        memberEntity.setEmail("延迟队列,异步处理会员信息");
        memberEntity.setCreateTime(new Date());
        //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。
        rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString()));

        return "向RabbitMQ发送消息成功";
    }

通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。

package com.yanxizhu.family.users.config;

import com.rabbitmq.client.Channel;
import com.yanxizhu.family.users.entity.MemberEntity;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @description: 延迟队列
 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。
 *       2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。
 *       3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/17 20:47
 * @version: 1.0
 */
@Configuration
public class DelayQueueConfig {

    /**
     * 监听最终的队列
     * @param entity
     * @param channel
     * @param message
     * @throws IOException
     */
    @RabbitListener(queues="member.release.queue")
    public void listener(MemberEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    //@bean自动创建交换机、路由、绑定

    //交换机
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange("member-event-exchange", true, false);
    }

    /**
     * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况)
     * @return
     */
    //死信队列
    @Bean
    public Queue memberDelayQueue(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机
        arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键
        arguments.put("x-message-ttl", 60000);//过期时间

        //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
        Queue queue = new Queue("member.delay.queue",true,false,false,arguments);
        return queue;

    }

    //将死信队列和交换机绑定,路由键-users.create.member
    @Bean
    public Binding orderCreateOrderBingding(){
        //目的地、绑定类型、交换机、路由键、参数
        //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments)
        return new Binding("member.delay.queue",
                Binding.DestinationType.QUEUE,
                "member-event-exchange",
                "users.create.member",
                null);
    }

    //最后过期的队列
    @Bean
    public Queue orderReleaseQueue(){
        Queue queue = new Queue("member.release.queue",true,false,false);
        return queue;
    }


    //将最后的队列和交换机绑定,路由键-user.release.member
    @Bean
    public Binding orderReleaseOrderBingding(){
        return new Binding("member.release.queue",
                Binding.DestinationType.QUEUE,
                "member-event-exchange",
                "users.release.member",
                null);
    }
}
9

评论 (0)

取消