爱玩科技网
您的当前位置:首页RabbitMQ死信队列

RabbitMQ死信队列

来源:爱玩科技网
RabbitMQ死信队列

⼀、死信队列

死信,顾名思义就是⽆法被消费的消息,⼀般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进⾏消费,但是某些时候由于特定的原因导致 Queue 中的某些消息⽆法被消费,这样的消息如果没有后续的处理就变成了死信,有死信⾃然就有了死信队列

死信队列有其特殊的应⽤场景,例如⽤户在商城下单成功并点击去⽀付的时候,如果在指定的时间内未⽀付,那么就可以将该下单消息投递到死信队列中,⾄于后续怎么处理死信队列需要结合具体的应⽤场景

⼆、死信的来源

通常死信的来源有下⾯⼏种⽅式1、消息 TTL (Time To Live) 过期

2、队列达到了最⼤长度,⽆法再添加消息到 MQ 中了

3、消息被拒,并且没有重新⼊队(basic.reject || basic.Nack) && (requeue = false)

三、消息 TTL 过期

1、Consumer01

public class Consumer01 {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_QUEUE = \"normal_queue\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; private static final String DEAD_EXCHANGE = \"dead_exchange\"; private static final String DEAD_ROUTING_KEY = \"dead\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明正常消息的交换机(类型为 direct)

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中) HashMap arguments = new HashMap<>(); arguments.put(\"x-dead-letter-exchange\

arguments.put(\"x-dead-letter-routing-key\

channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments); // 正常消息交换机绑定正常消息队列

channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments); // 消息成功之后的回调

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody()); System.out.println(msg); };

// 取消消费者的回调

CancelCallback cancelCallback = consumerTag -> { System.out.println(\"取消消费者时的回调接⼝\"); };

// 消费者消费消息

channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback); System.out.println(\"Consumer01 开始消费消息\"); }}

2、Consumer02

public class Consumer02 {

private static final String DEAD_EXCHANGE = \"dead_exchange\"; private static final String DEAD_QUEUE = \"dead_queue\"; private static final String DEAD_ROUTING_KEY = \"dead\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道对象

Channel channel = RabbitmqUtils.getChannel();

// 声明死信交换机(topic 类型)

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列

channel.queueDeclare(DEAD_QUEUE, true, false, false, null); // 死信交换机绑定死信队列

channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY); // 消息成功之后的回调

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody()); System.out.println(msg); };

// 取消消费者的回调

CancelCallback cancelCallback = consumerTag -> { System.out.println(\"取消消费者时的回调接⼝\"); };

// 消费者消费消息

channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); System.out.println(\"Consumer02 开始消费消息\"); }}

3、Producer

public class Producer {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明⼀个 direct 类型的交换机

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 消息发送 10 s 之后,如果没有消费者进⾏消费,那么该消息就称为死信,它就会进⼊死信队列中

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(\"10000\").build();

// 待发送的消息

String message = \"我是⼀只机智的⼩⽑⽑,很可爱,很机智\"; for (int i = 1; i < 11; i++) {

channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, (message + i).getBytes(StandardCharsets.UTF_8)); }

System.out.println(\"Producer send message successfully...\"); }}

4、测试过程及结果

启动 Consumer01 将普通交换机、普通队列注册到 RabbitMQ 上,启动 Consumer02 将死信交换机、死信队列注册到 RabbitMQ 上

然后为了演⽰消息超时之后可以进⼊死信队列,我们关闭 Consumer01,模拟其接收不到消息,为了不让死信消息被消费者消费掉,我们关闭 Consumer02,然后启动⽣产者 Producer

10 s 之后普通队列⾥的消息进⼊死信队列中

接着启动消费者 Consumer02 消费掉死信队列中的消息

四、队列达到最⼤长度

1、Consumer01

public class Consumer01 {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_QUEUE = \"normal_queue\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; private static final String DEAD_EXCHANGE = \"dead_exchange\"; private static final String DEAD_ROUTING_KEY = \"dead\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明正常消息的交换机(类型为 direct)

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中) HashMap arguments = new HashMap<>(); arguments.put(\"x-dead-letter-exchange\

arguments.put(\"x-dead-letter-routing-key\ // 设置正常队列的最⼤长度

arguments.put(\"x-max-length\

channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments); // 正常消息交换机绑定正常消息队列

channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments); // 消息成功之后的回调

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody());

System.out.println(msg); };

// 取消消费者的回调

CancelCallback cancelCallback = consumerTag -> { System.out.println(\"取消消费者时的回调接⼝\"); };

// 消费者消费消息

channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback); System.out.println(\"Consumer01 开始消费消息\"); }}

2、Consumer02 代码不变3、Producer

public class Producer {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明⼀个 direct 类型的交换机

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 待发送的消息

String message = \"我是⼀只机智的⼩⽑⽑,很可爱,很机智\"; for (int i = 1; i < 11; i++) {

channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8)); }

System.out.println(\"Producer send message successfully...\"); }}

4、测试过程及结果

删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后按照上⾯的⽅式启动 Consumer01、Consumer02 重新注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer01、Consumer02,最后启动 Producer 发送消息(如果 Consumer01 是⼀直打开的情况下,正常队列的消息就不会堆积到 6 条)

启动 Consumer01、Consumer02,发现 Consumer01 消费了 6 条消息,Consumer02 消费了四条消息

五、消息被拒

1、Consumer01

public class Consumer01 {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_QUEUE = \"normal_queue\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; private static final String DEAD_EXCHANGE = \"dead_exchange\"; private static final String DEAD_ROUTING_KEY = \"dead\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明正常消息的交换机(类型为 direct)

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中) HashMap arguments = new HashMap<>(); arguments.put(\"x-dead-letter-exchange\

arguments.put(\"x-dead-letter-routing-key\

channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments); // 正常消息交换机绑定正常消息队列

channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments); // 消息成功之后的回调

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody()); if (msg.contains(\"很机智4\")) {

System.out.println(\"Consumer01 接收到消息\" + msg + \"并拒绝签收该消息\");

//requeue 设置为 false 代表拒绝重新⼊队 该队列如果配置了死信交换机将发送到死信队列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else {

System.out.println(\"Consumer01 接收到消息\" + msg);

channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } };

// 取消消费者的回调

CancelCallback cancelCallback = consumerTag -> {

System.out.println(\"取消消费者时的回调接⼝\"); };

// 消费者消费消息(⼀定要开启⼿动应答,如果你开启了⾃动应答,根本不存在拒绝消息的情况) channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); System.out.println(\"Consumer01 开始消费消息\"); }}

2、Consumer02

public class Consumer02 {

private static final String DEAD_EXCHANGE = \"dead_exchange\"; private static final String DEAD_QUEUE = \"dead_queue\"; private static final String DEAD_ROUTING_KEY = \"dead\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道对象

Channel channel = RabbitmqUtils.getChannel();

// 声明死信交换机(topic 类型)

channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列

channel.queueDeclare(DEAD_QUEUE, true, false, false, null); // 死信交换机绑定死信队列

channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY); // 消息成功之后的回调

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody());

channel.basicAck(message.getEnvelope().getDeliveryTag(),false); System.out.println(msg); };

// 取消消费者的回调

CancelCallback cancelCallback = consumerTag -> { System.out.println(\"取消消费者时的回调接⼝\"); };

// 消费者消费消息

channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); System.out.println(\"Consumer02 开始消费消息\"); }}

3、Producer

public class Producer {

private static final String NORMAL_EXCHANGE = \"normal_exchange\"; private static final String NORMAL_ROUTING_KEY = \"normal\"; public static void main(String[] args) throws Exception { // ⾃定义⼯具类获取信道

Channel channel = RabbitmqUtils.getChannel();

// 声明⼀个 direct 类型的交换机

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 待发送的消息

String message = \"我是⼀只机智的⼩⽑⽑,很可爱,很机智\"; for (int i = 1; i < 11; i++) {

channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8)); }

System.out.println(\"Producer send message successfully...\"); }}

4、测试过程及结果

删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后重新启动 Consumer01、Consumer02 注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer02,启动 Producer 发送消息这⾥有⼏点需要注意⼀下

1、因为只有被拒绝的消息才能进⼊死信队列中,所以 Consumer01 不能关闭,为了能看到死信队列⾥的消息,不让它被消费掉,所以需要关闭 Consumer022、Consumer01 ⼀定要开启⼿动确认,因为⾃动确认的场景下根本不存在消息被拒绝的情况打开死信队列查看被拒绝的消息启动 Consumer02 消费死信消息

因篇幅问题不能全部显示,请点此查看更多更全内容