mq队列 关闭监听端

队列存活时间
This commit is contained in:
wangw 2025-02-25 17:55:12 +08:00
parent 6b4d9d6758
commit 978f7f1bdb
2 changed files with 46 additions and 46 deletions

View File

@ -1,44 +1,44 @@
package com.czg.mq;
import com.czg.config.RabbitConstants;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author GYJoker
*/
@Slf4j
@Component
public class RabbitmqReceiver {
/**
* 消费者监听绑定队列
* Queue RabbitConfig类的 orderPrintQueue
*/
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "#{orderPrintQueue.name}", durable = "true",
arguments = {@Argument(name = "x-message-ttl", value = "180000", type = "java.lang.Long")}),
exchange = @Exchange(value = RabbitConstants.Exchange.CASH_EXCHANGE)),
concurrency = "10"
)
@RabbitHandler
public void receiveOrderPrintQueue(Channel channel, String orderId, Message message) throws IOException {
try {
log.info("订单监听 消息体:{},消息内容:{}", message, orderId);
// 手动确认消息multiple 参数表示是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error(e.getMessage(), e);
// 判断是否需要重新入队
boolean requeue = false;
// 拒绝消息requeue true 表示将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, requeue);
}
}
}
//package com.czg.mq;
//
//import com.czg.config.RabbitConstants;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.rabbit.annotation.*;
//import org.springframework.stereotype.Component;
//
//import java.io.IOException;
//
///**
// * @author GYJoker
// */
//@Slf4j
//@Component
//public class RabbitmqReceiver {
//
// /**
// * 消费者监听绑定队列
// * Queue RabbitConfig类的 orderPrintQueue
// */
// @RabbitListener(
// bindings = @QueueBinding(value = @Queue(value = "#{orderPrintQueue.name}", durable = "true",
// arguments = {@Argument(name = "x-message-ttl", value = "180000", type = "java.lang.Long")}),
// exchange = @Exchange(value = RabbitConstants.Exchange.CASH_EXCHANGE)),
// concurrency = "10"
// )
// @RabbitHandler
// public void receiveOrderPrintQueue(Channel channel, String orderId, Message message) throws IOException {
// try {
// log.info("订单监听 消息体:{},消息内容:{}", message, orderId);
// // 手动确认消息multiple 参数表示是否批量确认
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (Exception e) {
// log.error(e.getMessage(), e);
// // 判断是否需要重新入队
// boolean requeue = false;
// // 拒绝消息requeue true 表示将消息重新放回队列
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, requeue);
// }
//
// }
//}

View File

@ -25,8 +25,8 @@ public class RabbitConfig {
Map<String, Object> args = new HashMap<>();
// 设置消息过期时间为 180000 毫秒 180
args.put("x-message-ttl", 180000);
// return new Queue(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRINT_QUEUE, true, false, false, args);
return new Queue(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRINT_QUEUE, true, false, false, null);
return new Queue(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRINT_QUEUE, true, false, false, args);
// return new Queue(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRINT_QUEUE, true, false, false, null);
}
@Bean