From 8a6ed19045048743b7b29c8ede9134ea1291e70d Mon Sep 17 00:00:00 2001 From: GYJ <1157756119@qq.com> Date: Thu, 9 Jan 2025 14:43:16 +0800 Subject: [PATCH] nsq11 --- .../java/com/sqx/OneTimeTaskListener.java | 10 ++++++ src/main/java/com/sqx/config/ShiroConfig.java | 1 + .../java/com/sqx/nsq/PublisherService.java | 4 +++ .../java/com/sqx/nsq/SubscriberService.java | 21 ++++++------ src/main/java/com/sqx/nsq/TestController.java | 32 +++++++++++++++++++ .../java/com/sqx/nsq/message/NsqMessage.java | 11 +++++++ .../com/sqx/nsq/subscriber/NsqSubscriber.java | 13 +++----- .../sqx/nsq/subscriber/TestNsqSubscriber.java | 25 +++++++++++++++ 8 files changed, 98 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/sqx/nsq/TestController.java create mode 100644 src/main/java/com/sqx/nsq/message/NsqMessage.java create mode 100644 src/main/java/com/sqx/nsq/subscriber/TestNsqSubscriber.java diff --git a/src/main/java/com/sqx/OneTimeTaskListener.java b/src/main/java/com/sqx/OneTimeTaskListener.java index d6e7c1cf..dff367ff 100644 --- a/src/main/java/com/sqx/OneTimeTaskListener.java +++ b/src/main/java/com/sqx/OneTimeTaskListener.java @@ -23,6 +23,9 @@ import com.sqx.modules.pay.wuyou.WuyouPay; import com.sqx.modules.sys.dao.SysUserMoneyDao; import com.sqx.modules.sys.entity.SysUserMoney; import com.sqx.modules.sys.service.SysUserService; +import com.sqx.nsq.SubscriberService; +import com.sqx.nsq.subscriber.NsqSubscriber; +import com.sqx.nsq.subscriber.TestNsqSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -31,6 +34,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.List; @@ -52,6 +56,10 @@ public class OneTimeTaskListener implements ApplicationListener cashOutList = cashOutDao.selectList(new LambdaQueryWrapper() // .eq(CashOut::getState, 2) // .isNull(CashOut::getOutAt) diff --git a/src/main/java/com/sqx/config/ShiroConfig.java b/src/main/java/com/sqx/config/ShiroConfig.java index 375ece0d..0a80d404 100644 --- a/src/main/java/com/sqx/config/ShiroConfig.java +++ b/src/main/java/com/sqx/config/ShiroConfig.java @@ -59,6 +59,7 @@ public class ShiroConfig { filterMap.put("/captcha.jpg", "anon"); filterMap.put("/aaa.txt", "anon"); filterMap.put("/search/**", "anon"); + filterMap.put("/nsq/**", "anon"); filterMap.put("/cashOutAudit/batchCashOutOrder", "anon"); filterMap.put("/**", "oauth2"); diff --git a/src/main/java/com/sqx/nsq/PublisherService.java b/src/main/java/com/sqx/nsq/PublisherService.java index b4b6b6ea..89b5afbc 100644 --- a/src/main/java/com/sqx/nsq/PublisherService.java +++ b/src/main/java/com/sqx/nsq/PublisherService.java @@ -1,11 +1,13 @@ package com.sqx.nsq; import com.sproutsocial.nsq.Publisher; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * @author GYJoker */ +@Slf4j @Service public class PublisherService { private final NsqConfig nsqConfig; @@ -20,6 +22,8 @@ public class PublisherService { public void publishData(String topic, byte[] data) { publisher.publish(topic, data); + + log.info("Published message: {}", new String(data)); } public static void main(String[] args) { diff --git a/src/main/java/com/sqx/nsq/SubscriberService.java b/src/main/java/com/sqx/nsq/SubscriberService.java index 3b6dc81a..f4ea8170 100644 --- a/src/main/java/com/sqx/nsq/SubscriberService.java +++ b/src/main/java/com/sqx/nsq/SubscriberService.java @@ -16,26 +16,25 @@ import org.springframework.stereotype.Component; public class SubscriberService { private final NsqConfig nsqConfig; - private final Subscriber subscriber; + private Subscriber subscriber; public SubscriberService(NsqConfig config) { this.nsqConfig = config; + init(); + } + + public void init() { subscriber = new Subscriber(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort())); + + System.out.println("SubscriberService init"); } - private void startSubscriber(NsqSubscriber nsqSubscriber) { - subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), (MessageHandler) data -> { - try { - Object o = JSONObject.parseObject(new String(data.getData()), nsqSubscriber.getDataType()); - nsqSubscriber.handleMessage(o); - } catch (Exception e) { - log.error("处理消息失败", e); - } - }); + public void startSubscriber(NsqSubscriber nsqSubscriber) { + subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), nsqSubscriber); } - public void receiveData(String topic, byte[] data) { + public void handleData2(byte[] data) { System.out.println("Received:" + new String(data)); } diff --git a/src/main/java/com/sqx/nsq/TestController.java b/src/main/java/com/sqx/nsq/TestController.java new file mode 100644 index 00000000..04fa4541 --- /dev/null +++ b/src/main/java/com/sqx/nsq/TestController.java @@ -0,0 +1,32 @@ +package com.sqx.nsq; + +import com.sqx.nsq.subscriber.TestNsqSubscriber; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author GYJoker + */ +@RestController +@RequestMapping("/nsq") +public class TestController { + private final PublisherService publisherService; + private final SubscriberService subscriberService; + + public TestController(PublisherService publisherService, SubscriberService subscriberService) { + this.publisherService = publisherService; + this.subscriberService = subscriberService; + + init(); + } + + public void init() { + subscriberService.startSubscriber(new TestNsqSubscriber()); + } + + @RequestMapping("/sender") + public String test(String msg) { + publisherService.publishData("test", msg.getBytes()); + return "test"; + } +} diff --git a/src/main/java/com/sqx/nsq/message/NsqMessage.java b/src/main/java/com/sqx/nsq/message/NsqMessage.java new file mode 100644 index 00000000..f1c97474 --- /dev/null +++ b/src/main/java/com/sqx/nsq/message/NsqMessage.java @@ -0,0 +1,11 @@ +package com.sqx.nsq.message; + +import lombok.Data; + +/** + * @author GYJoker + */ +@Data +public class NsqMessage { + private Long id; +} diff --git a/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java b/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java index 822cdacd..e17e8632 100644 --- a/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java +++ b/src/main/java/com/sqx/nsq/subscriber/NsqSubscriber.java @@ -1,9 +1,13 @@ package com.sqx.nsq.subscriber; +import com.sproutsocial.nsq.MessageDataHandler; +import org.apache.logging.log4j.message.Message; + /** * @author GYJoker */ -public abstract class NsqSubscriber { +public abstract class NsqSubscriber implements MessageDataHandler { + /** * 获取订阅的主题 */ @@ -13,11 +17,4 @@ public abstract class NsqSubscriber { * 获取订阅的频道 */ public abstract String getChannel(); - - public abstract Class getDataType(); - - /** - * 处理接收到的消息 - */ - public abstract void handleMessage(T message); } diff --git a/src/main/java/com/sqx/nsq/subscriber/TestNsqSubscriber.java b/src/main/java/com/sqx/nsq/subscriber/TestNsqSubscriber.java new file mode 100644 index 00000000..17a2dee0 --- /dev/null +++ b/src/main/java/com/sqx/nsq/subscriber/TestNsqSubscriber.java @@ -0,0 +1,25 @@ +package com.sqx.nsq.subscriber; + +import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.message.Message; + +/** + * @author GYJoker + */ +@Slf4j +public class TestNsqSubscriber extends NsqSubscriber { + @Override + public String getTopic() { + return "test"; + } + + @Override + public String getChannel() { + return "channel"; + } + + @Override + public void accept(byte[] data) { + System.out.println("Received111:" + new String(data)); + } +}