From 1799b5185abf6dbcb68d155c1f2ada795864ff71 Mon Sep 17 00:00:00 2001 From: GYJ <1157756119@qq.com> Date: Thu, 9 Jan 2025 12:32:48 +0800 Subject: [PATCH] nsq2 --- src/main/java/com/sqx/nsq/NsqConfig.java | 18 +++++++ src/main/java/com/sqx/nsq/PubExample.java | 20 -------- .../java/com/sqx/nsq/PublisherService.java | 29 +++++++++++ .../java/com/sqx/nsq/SubscriberService.java | 50 +++++++++++++++++++ .../com/sqx/nsq/subscriber/NsqSubscriber.java | 23 +++++++++ src/main/resources/application.yml | 5 +- 6 files changed, 122 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/sqx/nsq/NsqConfig.java delete mode 100644 src/main/java/com/sqx/nsq/PubExample.java create mode 100644 src/main/java/com/sqx/nsq/PublisherService.java create mode 100644 src/main/java/com/sqx/nsq/SubscriberService.java create mode 100644 src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java diff --git a/src/main/java/com/sqx/nsq/NsqConfig.java b/src/main/java/com/sqx/nsq/NsqConfig.java new file mode 100644 index 00000000..9df85427 --- /dev/null +++ b/src/main/java/com/sqx/nsq/NsqConfig.java @@ -0,0 +1,18 @@ +package com.sqx.nsq; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +/** + * @author GYJoker + */ +@Configuration +@Data +public class NsqConfig { + @Value("${nsq.host}") + private String nsqHost; + + @Value("${nsq.port}") + private String nsqPort; +} diff --git a/src/main/java/com/sqx/nsq/PubExample.java b/src/main/java/com/sqx/nsq/PubExample.java deleted file mode 100644 index 9ce6aca9..00000000 --- a/src/main/java/com/sqx/nsq/PubExample.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.sqx.nsq; - -import com.sproutsocial.nsq.Subscriber; - -/** - * @author GYJoker - */ -public class PubExample { - - public static void handleData(byte[] data) { - System.out.println("Received:" + new String(data)); - } - - public static void main(String[] args) { - Subscriber subscriber = new Subscriber("47.122.26.160"); - subscriber.subscribe("test", "channel1", PubExample::handleData); - - - } -} diff --git a/src/main/java/com/sqx/nsq/PublisherService.java b/src/main/java/com/sqx/nsq/PublisherService.java new file mode 100644 index 00000000..b4b6b6ea --- /dev/null +++ b/src/main/java/com/sqx/nsq/PublisherService.java @@ -0,0 +1,29 @@ +package com.sqx.nsq; + +import com.sproutsocial.nsq.Publisher; +import org.springframework.stereotype.Service; + +/** + * @author GYJoker + */ +@Service +public class PublisherService { + private final NsqConfig nsqConfig; + + private final Publisher publisher; + + public PublisherService(NsqConfig config) { + this.nsqConfig = config; + + this.publisher = new Publisher(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort())); + } + + public void publishData(String topic, byte[] data) { + publisher.publish(topic, data); + } + + public static void main(String[] args) { + Publisher publisher = new Publisher("47.122.26.160:4150"); + publisher.publish("test", new byte[]{'a', 'A', 'C'}); + } +} diff --git a/src/main/java/com/sqx/nsq/SubscriberService.java b/src/main/java/com/sqx/nsq/SubscriberService.java new file mode 100644 index 00000000..3b6dc81a --- /dev/null +++ b/src/main/java/com/sqx/nsq/SubscriberService.java @@ -0,0 +1,50 @@ +package com.sqx.nsq; + + +import com.alibaba.fastjson.JSONObject; +import com.sproutsocial.nsq.MessageHandler; +import com.sproutsocial.nsq.Subscriber; +import com.sqx.nsq.subscriber.NsqSubscriber; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author GYJoker + */ +@Component +@Slf4j +public class SubscriberService { + private final NsqConfig nsqConfig; + + private final Subscriber subscriber; + + public SubscriberService(NsqConfig config) { + this.nsqConfig = config; + + subscriber = new Subscriber(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort())); + } + + private void startSubscriber(NsqSubscriber nsqSubscriber) { + subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), (MessageHandler) data -> { + try { + Object o = JSONObject.parseObject(new String(data.getData()), nsqSubscriber.getDataType()); + nsqSubscriber.handleMessage(o); + } catch (Exception e) { + log.error("处理消息失败", e); + } + }); + } + + public void receiveData(String topic, byte[] data) { + System.out.println("Received:" + new String(data)); + } + + public static void handleData(byte[] data) { + System.out.println("Received:" + new String(data)); + } + + public static void main(String[] args) { + Subscriber subscriber = new Subscriber("47.122.26.160"); + subscriber.subscribe("test", "channel1", SubscriberService::handleData); + } +} diff --git a/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java b/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java new file mode 100644 index 00000000..822cdacd --- /dev/null +++ b/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java @@ -0,0 +1,23 @@ +package com.sqx.nsq.subscriber; + +/** + * @author GYJoker + */ +public abstract class NsqSubscriber { + /** + * 获取订阅的主题 + */ + public abstract String getTopic(); + + /** + * 获取订阅的频道 + */ + public abstract String getChannel(); + + public abstract Class getDataType(); + + /** + * 处理接收到的消息 + */ + public abstract void handleMessage(T message); +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 671c93c2..f0310e1b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -10,9 +10,8 @@ server: context-path: /czg nsq: - lookup-addresses: 127.0.0.1:4161 # 这里填写你的NSQ lookupd服务地址和端口,示例用本地地址 - topic: test_topic # 定义要使用的NSQ主题 - channel: test_channel # 定义NSQ通道 + host: 47.122.26.160 + port: 4150 spring: main: