RabbitMQ延时队列及消息可靠性

admin
2022-03-13 / 0 评论 / 268 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年03月26日,已超过930天没有更新,若内容或图片失效,请留言反馈。

RabbitMQ延时队列(实现定时任务)

场景: 比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。 常用解决方案: spring的 schedule 定时任务轮询数据库 缺点: 消耗系统内存、增加了数据库的压力、存在较大的时间误差 解决:rabbitmq的消息TTL和死信Exchange结合

消息的TTL(Time To Live)

• 消息的TTL就是消息的存活时间。
• RabbitMQ可以对队列和消息分别设置TTL。
• 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的
设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
• 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队
列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的
TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

Dead Letter Exchanges(DLX)

• 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,
一个路由可以对应很多队列。(什么是死信)
• 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不
会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
• 上面的消息的TTL到了,消息过期了。
• 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
• Dead Letter Exchange其实就是一种普通的exchange,和创建其他
exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有
消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
• 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息
被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列

• 手动ack&异常消息统一放在一个队列处理建议的两种方式
• catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
• 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败

延时队列实现

SpringBoot中使用延时队列

• 1、Queue、Exchange、Binding可以@Bean进去
• 2、监听消息的方法可以有三种参数(不分数量,顺序)
• Object content, Message message, Channel channel
• 3、channel可以用来拒绝消息,否则自动ack;

代码实现

发送消息到MQ:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

/**
 * @description: TODO
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/13 15:41
 * @version: 1.0
 */
@RestController
public class RabbitMQControoler {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @ResponseBody
    @GetMapping("/createOrder")
    public String sendMessage(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderEn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        //发送消息到MQ
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order",entity);
        return "OK";
    }
}

通过延迟队列,将延迟的订单,发送到另一个队列,通过监听过期延迟的队列,接收延迟的订单。

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @description: 延迟队列
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/3/13 13:27
 * @version: 1.0
 */
@Configuration
public class MyRabbitMQConfig {

    @RabbitListener(queues="order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期的订单信息:准备关闭订单"+entity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }


    /**
     * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况)
     * @return
     */

    //死信队列
    @Bean
    public Queue orderDelayQueue(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-Letter-exchange", "order-event-exchange");//交换机
        arguments.put("x-dead-letter-routing-key", "order.release.order");//路由键
        arguments.put("x-message-ttl", 60000);//过期时间

        //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
        Queue queue = new Queue("order.delay.queue",true,false,false,arguments);
        return queue;

    }

    //最后过期的队列
    @Bean
    public Queue orderReleaseQueue(){
        Queue queue = new Queue("order.release.order.queue",true,false,false);
        return queue;
    }

    //交换机
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange("order-event-exchange", true, false);
    }

    //绑定交换机和死信队列
    @Bean
    public Binding orderCreateOrderBingding(){
        //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments)
        return new Binding("order-event-exchange",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    //绑定交换机和最后过期的队列
    @Bean
    public Binding orderReleaseOrderBingding(){
        return new Binding("order-event-exchange",
                Binding.DestinationType.QUEUE,
                "order.release.order.queue",
                "order.release.order",
                null);
    }
}

如何保证消息可靠性-消息丢失

• 1、消息丢失
• 消息发送出去,由于网络问题没有抵达服务器
• 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机
制,可记录到数据库,采用定期扫描重发的方式
• 做好日志记录,每个消息状态是否都被服务器收到都应该记录
• 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进
行重发
• 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚
未持久化完成,宕机。
• publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
• 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
• 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重
新入队

如何保证消息可靠性-消息重复

• 2、消息重复
• 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息
重新由unack变为ready,并发送给其他消费者
• 消息消费失败,由于重试机制,自动又将消息发送出去
• 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
• 消费者的业务消费接口应该设计为幂等性的。比如扣库存有
工作单的状态标志
• 使用防重表(redis/mysql),发送消息每一个都有业务的唯
一标识,处理过就不用处理
• rabbitMQ的每一个消息都有redelivered字段,可以获取是否
是被重新投递过来的,而不是第一次投递过来的

• 2、消息重复
• 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息
重新由unack变为ready,并发送给其他消费者
• 消息消费失败,由于重试机制,自动又将消息发送出去
• 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
• 消费者的业务消费接口应该设计为幂等性的。比如扣库存有
工作单的状态标志
• 使用防重表(redis/mysql),发送消息每一个都有业务的唯
一标识,处理过就不用处理
• rabbitMQ的每一个消息都有redelivered字段,可以获取是否

是被重新投递过来的,而不是第一次投递过来的

• 3、消息积压
• 消费者宕机积压
• 消费者消费能力不足积压
• 发送者发送流量太大
• 上线更多的消费者,进行正常消费
• 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

代码使用,请参考RabbitMQ运行机制 安装请参考docker安装RabbitMQ 学习RabbitMQ请参看初识RabbitMQ

5

评论 (0)

取消