diff --git a/src/main/java/com/sqx/nsqChannel/NsqConfig.java b/src/main/java/com/sqx/nsqChannel/NsqConfig.java index cfcea39e..96f84694 100644 --- a/src/main/java/com/sqx/nsqChannel/NsqConfig.java +++ b/src/main/java/com/sqx/nsqChannel/NsqConfig.java @@ -9,21 +9,21 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -@Service +//@Service @Slf4j public class NsqConfig { -// /** -// * 端口号 -// */ -// @Value("${nsq.port}") -// private int port; -// -// @Value("${nsq.address}") -// private String address; -// + /** + * 端口号 + */ + @Value("${nsq.port.consumer}") + private int port; + + @Value("${nsq.address}") + private String address; + // @PostConstruct -// public void startNSQChannel() { -// new NSQConsumers(address, port, "test", "ch1", new Print2MessageHandlerAdapter()); -// new NSQConsumers(address, port, "test", "ch2", new PrintMessageHandlerAdapter()); -// } + public void startNSQChannel() { + new NSQConsumers(address, port, "test", "ch1", new Print2MessageHandlerAdapter()); + new NSQConsumers(address, port, "test", "ch2", new PrintMessageHandlerAdapter()); + } } diff --git a/src/main/java/com/sqx/nsqChannel/config/NsqProduce.java b/src/main/java/com/sqx/nsqChannel/config/NsqProduce.java new file mode 100644 index 00000000..fbd68b8a --- /dev/null +++ b/src/main/java/com/sqx/nsqChannel/config/NsqProduce.java @@ -0,0 +1,53 @@ +package com.sqx.nsqChannel.config; + +import com.github.brainlag.nsq.NSQProducer; +import com.github.brainlag.nsq.exceptions.NSQException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeoutException; + +@Slf4j +//@Component +public class NsqProduce { + @Value("${nsq.port.produce}") + private int port; + + @Value("${nsq.address}") + private String address; + + + private NSQProducer producer; + private static boolean is_start = false; + private static final byte[] LOCK = new byte[0]; + + public NsqProduce() { + producer = new NSQProducer(); + producer.addAddress(address, port).start(); + is_start = true; + } + + public NSQProducer getProducer() { + if (!is_start) { + log.info("========================NSQProduce no start===================="); + } + if (producer == null) { + synchronized (LOCK) { + if (producer == null) { + producer = new NSQProducer(); + producer.addAddress(address, port).start(); + } + } + } + return producer; + } + + public void sendMsgToTestTopic(String topic,String msg) { + try { + this.getProducer().produce(topic, msg.getBytes()); + } catch (NSQException | TimeoutException e) { + log.error(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/sqx/nsqChannel/enums/TopicEnums.java b/src/main/java/com/sqx/nsqChannel/enums/TopicEnums.java new file mode 100644 index 00000000..177ebb9a --- /dev/null +++ b/src/main/java/com/sqx/nsqChannel/enums/TopicEnums.java @@ -0,0 +1,4 @@ +package com.sqx.nsqChannel.enums; + +public enum TopicEnums { +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index a98e79b4..c595b9b9 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -10,7 +10,9 @@ server: nsq: address: 47.122.26.160 - port: 4161 + port: + produce: 4150 + consumer: 4161 # 数据源的一些配置 driver-class-name: com.mysql.cj.jdbc.Driver diff --git a/src/main/resources/application-pay.yml b/src/main/resources/application-pay.yml index f1896cb2..2dc7a0d6 100644 --- a/src/main/resources/application-pay.yml +++ b/src/main/resources/application-pay.yml @@ -11,7 +11,9 @@ server: nsq: address: 47.122.26.160 - port: 4161 + port: + produce: 4150 + consumer: 4161 # 数据源的一些配置 driver-class-name: com.mysql.cj.jdbc.Driver diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index e45f1af2..43a1131b 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -10,7 +10,9 @@ server: nsq: address: 127.0.0.1 - port: 4161 + port: + produce: 4150 + consumer: 4161 # 数据源的一些配置 driver-class-name: com.mysql.cj.jdbc.Driver