nsq消息队列 初始2

This commit is contained in:
wangw 2025-01-10 10:55:03 +08:00
parent 28112104c4
commit 95fd316f4d
6 changed files with 80 additions and 17 deletions

View File

@ -9,21 +9,21 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
//@Service
@Slf4j
public class NsqConfig {
// /**
// * 端口号
// */
// @Value("${nsq.port}")
// private int port;
//
// @Value("${nsq.address}")
// private String address;
//
/**
* 端口号
*/
@Value("${nsq.port.consumer}")
private int port;
@Value("${nsq.address}")
private String address;
// @PostConstruct
// public void startNSQChannel() {
// new NSQConsumers(address, port, "test", "ch1", new Print2MessageHandlerAdapter());
// new NSQConsumers(address, port, "test", "ch2", new PrintMessageHandlerAdapter());
// }
public void startNSQChannel() {
new NSQConsumers(address, port, "test", "ch1", new Print2MessageHandlerAdapter());
new NSQConsumers(address, port, "test", "ch2", new PrintMessageHandlerAdapter());
}
}

View File

@ -0,0 +1,53 @@
package com.sqx.nsqChannel.config;
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeoutException;
@Slf4j
//@Component
public class NsqProduce {
@Value("${nsq.port.produce}")
private int port;
@Value("${nsq.address}")
private String address;
private NSQProducer producer;
private static boolean is_start = false;
private static final byte[] LOCK = new byte[0];
public NsqProduce() {
producer = new NSQProducer();
producer.addAddress(address, port).start();
is_start = true;
}
public NSQProducer getProducer() {
if (!is_start) {
log.info("========================NSQProduce no start====================");
}
if (producer == null) {
synchronized (LOCK) {
if (producer == null) {
producer = new NSQProducer();
producer.addAddress(address, port).start();
}
}
}
return producer;
}
public void sendMsgToTestTopic(String topic,String msg) {
try {
this.getProducer().produce(topic, msg.getBytes());
} catch (NSQException | TimeoutException e) {
log.error(e.getMessage());
}
}
}

View File

@ -0,0 +1,4 @@
package com.sqx.nsqChannel.enums;
public enum TopicEnums {
}

View File

@ -10,7 +10,9 @@ server:
nsq:
address: 47.122.26.160
port: 4161
port:
produce: 4150
consumer: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -11,7 +11,9 @@ server:
nsq:
address: 47.122.26.160
port: 4161
port:
produce: 4150
consumer: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -10,7 +10,9 @@ server:
nsq:
address: 127.0.0.1
port: 4161
port:
produce: 4150
consumer: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver