1、死信队列介绍
1.1 什么是死信
就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
1.2 应用场景
- 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
- 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
1.3 死信的来源
- 消息 TTL 过期
- 队列达到最大长度
- 消息被拒绝
2、死信实战案例
交换机类型是 direct,两个消费者,一个生产者,两个队列:消息队列和死信队列
2.1 消息 TTL 过期
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置消息的 TTL 时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
}
}
}
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定:队列、交换机、路由键(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
//正常队列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}
public class Consumer02 {
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
-
启动 C1,之后关闭消费者,模拟其接收不到消息。再启动 Producer
-
再启动消费者 C2,让它去消费死信队列里的消息
2.2 死信最大长度
和上面的案例一相比去掉 TTL 属性部分
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normalQueue", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:" + message);
}
}
}
相比于案例一多设置一个正常队列长度限制的参数:params.put("x-max-length", 6);
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 深耕奴婢个死信和普通交换类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
// 声明队列:①队列名称;②是否持久化;③是否消息共享;④是否自动删除;⑤其他参数
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定:①队列;②交换机;③Routing key
channel.queueBind(deadQueue, DEAD_EXCHANGE, "deadQueue");
// 正常队列绑定死信队列信息
String normalQueue = "normal-queue";
// 设置参数值
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信交换机
params.put("x-dead-letter-routing-key", "deadQueue"); // 正常队列设置死信的 Routing Key 参数
params.put("x-max-length", 6); // 设置正常队列的长度限制
// 声明队列:①队列名称;②是否持久化;③是否消息共享;④是否自动删除;⑤其他参数
channel.queueDeclare(normalQueue, false, false, false, params);
// 死信队列绑定:①队列;②交换机;③Routing key
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normalQueue");
System.out.println("Consumer01 等待接收消息……");
// 消费者消费消息:①队列;②是否自动应答;③成功消费回调;④取消消费的回调
channel.basicConsume(normalQueue, true, (s, delivery) -> {
System.out.println("接收到的消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {});
}
}
与案例一代码一致
public class Consumer02 {
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { });
}
}
-
启动 C1,之后关闭消费者,模拟其接收不到消息。再启动 Producer
-
再启动消费者 C1 和 C2,让它们去消费各自队列里的消息
2.3 死信消息被拒
与案例二生产者代码一致
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normalQueue", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:" + message);
}
}
}
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 深耕奴婢个死信和普通交换类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
// 声明队列:①队列名称;②是否持久化;③是否消息共享;④是否自动删除;⑤其他参数
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定:①队列;②交换机;③Routing key
channel.queueBind(deadQueue, DEAD_EXCHANGE, "deadQueue");
// 正常队列绑定死信队列信息
String normalQueue = "normal-queue";
// 设置参数值
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信交换机
params.put("x-dead-letter-routing-key", "deadQueue"); // 正常队列设置死信的 Routing Key 参数
// 声明队列:①队列名称;②是否持久化;③是否消息共享;④是否自动删除;⑤其他参数
channel.queueDeclare(normalQueue, false, false, false, params);
// 死信队列绑定:①队列;②交换机;③Routing key
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normalQueue");
System.out.println("Consumer01 等待接收消息……");
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
if(msg.equals("info5")){
System.out.println("Consumer01接受的消息是:"+msg+": 此消息是被C1拒绝的");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接受的消息是:"+msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
};
// 消费者消费消息:①队列;②是否自动应答;③成功消费回调;④取消消费的回调
channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
}
}
与案例一代码一致
public class Consumer02 {
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { });
}
}
-
先启动 C1 消费者,然后启动生产者
-
再启动 C2 死信队列,进行消费
3、延迟队列介绍
3.1 延迟队列概念
队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
3.2 应用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
4、TTL
4.1 什么是 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
4.2 TTL 的两种设置
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
-
队列设置 TTL:在创建队列的时候设置队列的
x-message-ttl
属性Map<String, Object> params = new HashMap<>(); params.put("x-message-ttl",5000); return QueueBuilder.durable("QA").withArguments(args).build(); /
-
消息设置 TTL:针对每条消息设置 TTL
rabbitTemplate.converAndSend("X","XC",message,correlationData -> { correlationData.getMessageProperties().setExpiration("5000"); });
如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为
5、SpringBoot 整合 RabbitMQ 简单案例
5.1 代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
5.2 环境配置与代码实现
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 101.42.242.28 # 服务器IP
port: 5672 # 端口号,注意:15672是网页端口号,5672是代码连接用的端口号
username: guest # 默认账号
password: guest # 默认密码
@Configuration
public class TtlQueueConfig {
// 普通交换机的名称
public static final String X_EXCHANGE = "normal_exchange_X";
// 普通队列的名称
public static final String QUEUE_A = "normal_queue_A";
public static final String QUEUE_B = "normal_queue_B";
// 普通队列与普通交换机绑定的 RoutingKey
public static final String RoutingKey_XA ="XA";
public static final String RoutingKey_XB ="XB";
// 死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "dead_exchange_Y";
// 死信队列的名称
public static final String DEAD_LETTER_QUEUE = "dead_queue_D";
// 死信队列与死信交换机绑定的 RoutingKey
public static final String RoutingKey_YD = "YD";
// 声明 xExchange 别名
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明死信队列交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列 A 的 ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 Key
args.put("x-dead-letter-routing-key", RoutingKey_YD);
// 声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with(RoutingKey_XA);
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", RoutingKey_YD);
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with(RoutingKey_XB);
}
// 声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with(RoutingKey_YD);
}
}
@Slf4j
@RestController
@RequestMapping("ttl")
public class SendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", NowTimeUtil.now(), message);
/**
* 1.第一个参数:交换机名称
* 2.第二个参数:routingKey
* 3.发送的消息
*/
rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.RoutingKey_XA, "消息来自 ttl(10s)的队列——" + message);
rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, TtlQueueConfig.RoutingKey_XB, "消息来自 ttl(40s)的队列——" + message);
}
}
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = TtlQueueConfig.DEAD_LETTER_QUEUE)
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间:{},收到死信队列信息:{}", NowTimeUtil.now(), msg);
}
}
5.3 延时队列 TTL 优化
发起一个请求:http://localhost:8080/ttl/sendMsg/嗨害嗨,来咯
但是这样使用会存在一个问题:如果每增加一个新的时间需求,就要新增一个队列,因此后续需要对其进行优化
新增一个队列 QC(该队列不设置 TTL 时间),绑定关系如下:
为了便于区分,所以新建一个配置文件类
@Configuration
public class MsgTtlQueueConfig {
public static final String QUEUE_C = "normal_queue_C";
public static final String RoutingKey_XC ="XC";
// 声明队列 C 并绑定死信交换机
@Bean("queueC")
public Queue queueC() {
Map<String, Object> params = new HashMap<>(2);
// 声明当前队列绑定的死信交换机
params.put("x-dead-letter-exchange", TtlQueueConfig.Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 Key
params.put("x-dead-letter-routing-key", TtlQueueConfig.RoutingKey_YD);
// 没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queueCBinding(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with(RoutingKey_XC);
}
}
/**
* 延时队列优化
* @param message 要发送的消息
* @param ttlTime 延时时间,单位:ms
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttlTime);
return message;
}
};
rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE, MsgTtlQueueConfig.RoutingKey_XC, message, messagePostProcessor);
}
分别在浏览器地址上发起请求:
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000
注意:因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
5.4 插件实现延迟队列
如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列
可去官网下载 rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
中,然后执行以下命令
#安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
systemctl restart rabbitmq-server
在新增一个延迟队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定义交换机,这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> params = new HashMap<>();
// 自定义交换机的类型
params.put("x-delayed-type", "direct");
/**
* 1.交换机的名称
* 2.交换机的类型 x-delayed-message
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他的参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, params);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delayTime);
return message;
}
};
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, messagePostProcessor);
log.info(" 当前时间:{}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", NowTimeUtil.now(), delayTime, message);
}
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间:{},收到延时队列的消息:{}", NowTimeUtil.now(), msg);
}
}
5.5 总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。
6、幂等性
6.1 幂等性概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
6.2 消息重复消费
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
6.3 解决思路
业界主流的幂等性有两种操作
-
唯一ID + 指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
-
note Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
7、优先级队列
7.1 使用场景
在我们系统中有一个订单催付的场景,客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能。
但是,肯定是要分大客户和小客户,大客户订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
7.2 添加方法
-
控制台页面添加
-
声明队列的时候添加优先级
设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap(); // 优先级为 10 params.put("x-max-priority", 10); channel.queueDeclare("hello", true, false, false, params);
-
消息中代码添加优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
注意:要让队列实现优先级需要做的事情有如下事情:
- 队列需要设置为优先级队列,
- 消息需要设置消息的优先级,
- 消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
7.3 使用案例
public class PriorityProducer {
private static final String QUEUE_NAME = "priority_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 给消息赋予一个 priority 属性
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().priority(1).priority(10).build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
if (i==5) {
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送完成:" + message);
}
}
}
public class PriorityConsumer {
private static final String QUEUE_NAME = "priority_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到255 官网推荐1-10 如果设置太高比较吃内存和CPU
Map<String, Object> params = new HashMap<>();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
// 推送消息如何进行消费的接口回调
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("消费的消息:" + message);
}
};
// 取消消费的一个回调接口(比如在消费的时候队列被删除掉了)
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("消息消费被中断了");
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
8、惰性队列
8.1 使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。
当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。
8.2 两种模式
- default:默认模式,在 3.6.0 之前版本无需任何变更
- lazy:惰性队列的模式
- 可以通过调用
channel.queueDeclare
方法的时候在参数中设置 - 可以通过 Policy 的方式设置(优先级更高)
- 可以通过调用
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的
在队列声明的时候可以通过 x-queue-mode
参数来设置队列的模式,取值为 default 和 lazy
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
在发送 1百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2G,而惰性队列仅占用 1.5MB
1 条评论