订单下单30分钟后,如果用户没有付款,则系统自动取消订单。
会议开始前10分钟,推送消息提醒用户。
自定义某个操作的执行时间,如果设置文章在明早9点发布。
除了上一篇《使用RabbitMQ插件实现延迟队列》外,使用死信队列也是一种方案.
Time To Live:可以在发送消息时设置过期时间,也可以设置整个队列的过期时间,如果两个同时设置已最早过期时间为准。
Dead Letter Exchanges:可以通过绑定队列的死信交换器来实现死信队列。
x-dead-letter-exchange:绑定死信交换器(其实也是普通交换器,与类型无关)
x-dead-letter-routing-key:绑定死信队列的路由键(可选)
x-message-ttl:绑定队列消息的过期时间(可选)死信队列设计思路:
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
进入消息队列:
1. 消息被拒绝,并且requeue= false
2. 消息ttl过期
3. 队列达到最大的长度做延迟队列需要创建一个没有消费者的队列,用来存储消息。然后创建一个真正的消费队列,用来做具体的业务逻辑。当带有TTL的消息到达绑定死信交换器的队列,因为没有消费者所以会一直等到消息过期,然后消息被投递到死信队列也就是真正的消费队列。
具体代码:
package com.lyt.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.HashMap;import java.util.Map;/*** @Description 利用死信队列和过期时间模拟延迟队列,没有消费者,所以不能用注解形式* Time To Live(TTL)* 1. 可以在发送消息时设置过期时间(message.getMessageProperties().setExpiration("5000");)* 2. 也可以设置整个队列的过期时间(args.put("x-message-ttl",10000);)* 3. 如果两个同时设置已最早过期时间为准* Dead Letter Exchanges(DLX)* @Date 2019-03-10 10:25:30*/public class MQDelayConfig {/*** @Description 定义支付交换器* @Author lyt* @Date 2021-04-02 14:39:31*/private DirectExchange directPayExchange() {return new DirectExchange("direct.pay.exchange");}/*** @Description 定义支付队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间* @Author lyt* @Date 2021-04-02 14:40:24*/private Queue directPayQueue() {Map<String, Object> args = new HashMap<>(3);//声明死信交换器args.put("x-dead-letter-exchange", "direct.delay.exchange");//声明死信路由键args.put("x-dead-letter-routing-key", "DelayKey");//声明队列消息过期时间args.put("x-message-ttl", 10000);return new Queue("direct.pay.queue", true, false, false, args);}/*** @Description 定义支付绑定* @Author lyt* @Date 2021-04-02 14:46:10*/private Binding bindingOrderDirect() {return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");}}
带有过期时间且绑定死信交换器的队列
生产者,为消息设置过期时间setExpiration("15000");
/*** @Description 支付队列、绑定死信队列,测试消息延迟功能* @Author lyt* @Date 2021-04-02 14:07:25*/@RequestMapping(value = "/directDelayMQ", method = {RequestMethod.GET})public List<User> directDelayMQ() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");List<User> users = userService.getUserList(null);for (User user : users) {CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,message -> {// 设置5秒过期message.getMessageProperties().setExpiration("15000");return message;},correlationData);System.out.println(user.getName() + ":" + sdf.format(new Date()));}return users;}
消费者,声明真正消费的队列、交换器、绑定
/*** @Description 延迟队列* @Author lyt* @Date 2021-04-04 16:34:28*/(bindings = {(value = (value = "direct.delay.queue"), exchange = (value = "direct.delay.exchange"), key = {"DelayKey"})})public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 模拟执行任务System.out.println("这是延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
思考: 如果先放入一条A消息过期时间是10秒,再放入一个b消息过期时间是5秒,那延迟队列是否可以先消费b消息?
答案是否定的,因为队列就会遵循先进先出的规则,b消息会等a消息过期后,一起消费,这就是所谓的队列阻塞。由这个问题可以用我们之前介绍的插件方式解决。
扫描二维码
关注我们
了解更多全栈技能
相关新闻
◎版权作品,未经中国文化报道网书面授权,严禁转载,违者将被追究法律责任。
Copyright 2015-2019. 中国文化报道网 www.cgia.cn All rights reserved.
违法和不良信息举报邮箱:jubao@cgia.cn
未经过本站允许,请勿将本站内容传播或复制