This commit is contained in:
GYJ 2025-01-09 12:32:48 +08:00
parent 239e593c61
commit 1799b5185a
6 changed files with 122 additions and 23 deletions

View File

@ -0,0 +1,18 @@
package com.sqx.nsq;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* @author GYJoker
*/
@Configuration
@Data
public class NsqConfig {
@Value("${nsq.host}")
private String nsqHost;
@Value("${nsq.port}")
private String nsqPort;
}

View File

@ -1,20 +0,0 @@
package com.sqx.nsq;
import com.sproutsocial.nsq.Subscriber;
/**
* @author GYJoker
*/
public class PubExample {
public static void handleData(byte[] data) {
System.out.println("Received:" + new String(data));
}
public static void main(String[] args) {
Subscriber subscriber = new Subscriber("47.122.26.160");
subscriber.subscribe("test", "channel1", PubExample::handleData);
}
}

View File

@ -0,0 +1,29 @@
package com.sqx.nsq;
import com.sproutsocial.nsq.Publisher;
import org.springframework.stereotype.Service;
/**
* @author GYJoker
*/
@Service
public class PublisherService {
private final NsqConfig nsqConfig;
private final Publisher publisher;
public PublisherService(NsqConfig config) {
this.nsqConfig = config;
this.publisher = new Publisher(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort()));
}
public void publishData(String topic, byte[] data) {
publisher.publish(topic, data);
}
public static void main(String[] args) {
Publisher publisher = new Publisher("47.122.26.160:4150");
publisher.publish("test", new byte[]{'a', 'A', 'C'});
}
}

View File

@ -0,0 +1,50 @@
package com.sqx.nsq;
import com.alibaba.fastjson.JSONObject;
import com.sproutsocial.nsq.MessageHandler;
import com.sproutsocial.nsq.Subscriber;
import com.sqx.nsq.subscriber.NsqSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author GYJoker
*/
@Component
@Slf4j
public class SubscriberService {
private final NsqConfig nsqConfig;
private final Subscriber subscriber;
public SubscriberService(NsqConfig config) {
this.nsqConfig = config;
subscriber = new Subscriber(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort()));
}
private void startSubscriber(NsqSubscriber nsqSubscriber) {
subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), (MessageHandler) data -> {
try {
Object o = JSONObject.parseObject(new String(data.getData()), nsqSubscriber.getDataType());
nsqSubscriber.handleMessage(o);
} catch (Exception e) {
log.error("处理消息失败", e);
}
});
}
public void receiveData(String topic, byte[] data) {
System.out.println("Received:" + new String(data));
}
public static void handleData(byte[] data) {
System.out.println("Received:" + new String(data));
}
public static void main(String[] args) {
Subscriber subscriber = new Subscriber("47.122.26.160");
subscriber.subscribe("test", "channel1", SubscriberService::handleData);
}
}

View File

@ -0,0 +1,23 @@
package com.sqx.nsq.subscriber;
/**
* @author GYJoker
*/
public abstract class NsqSubscriber<T> {
/**
* 获取订阅的主题
*/
public abstract String getTopic();
/**
* 获取订阅的频道
*/
public abstract String getChannel();
public abstract Class<T> getDataType();
/**
* 处理接收到的消息
*/
public abstract void handleMessage(T message);
}

View File

@ -10,9 +10,8 @@ server:
context-path: /czg
nsq:
lookup-addresses: 127.0.0.1:4161 # 这里填写你的NSQ lookupd服务地址和端口示例用本地地址
topic: test_topic # 定义要使用的NSQ主题
channel: test_channel # 定义NSQ通道
host: 47.122.26.160
port: 4150
spring:
main: