channel 配置 初版

This commit is contained in:
wangw 2025-01-09 17:08:26 +08:00
parent fc01fdca2b
commit e5ebc3798f
9 changed files with 125 additions and 0 deletions

View File

@ -53,6 +53,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.brainlag</groupId>
<artifactId>nsq-client</artifactId>
<version>1.0.0.RC4</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dytnsapi20200217</artifactId>

View File

@ -0,0 +1,29 @@
package com.sqx.nsqChannel;
import com.sqx.nsqChannel.channels.Print2MessageHandlerAdapter;
import com.sqx.nsqChannel.channels.PrintMessageHandlerAdapter;
import com.sqx.nsqChannel.config.NSQConsumers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
@Slf4j
public class NsqConfig {
/**
* 端口号
*/
@Value("${nsq.port}")
private int port;
@Value("${nsq.address}")
private String address;
@PostConstruct
public void startNSQChannel() {
new NSQConsumers(address, port, "test", "ch1", new Print2MessageHandlerAdapter());
new NSQConsumers(address, port, "test", "ch2", new PrintMessageHandlerAdapter());
}
}

View File

@ -0,0 +1,13 @@
package com.sqx.nsqChannel.channels;
import com.sqx.nsqChannel.config.NSQMessageHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Print2MessageHandlerAdapter extends NSQMessageHandlerAdapter {
@Override
public void handleMessage(byte[] message) {
System.out.println("Print2MessageHandlerAdapter message: " + new String(message));
}
}

View File

@ -0,0 +1,15 @@
package com.sqx.nsqChannel.channels;
import com.sqx.nsqChannel.config.NSQMessageHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PrintMessageHandlerAdapter extends NSQMessageHandlerAdapter {
@Override
public void handleMessage(byte[] message) {
System.out.println("PrintMessageHandlerAdapter message: " + new String(message));
}
}

View File

@ -0,0 +1,45 @@
package com.sqx.nsqChannel.config;
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;
public class NSQConsumers {
private final String nsqLookupAddress;
private final int nsqLookupPort;
private final String topics;
private final String channelName;
private final NSQMessageHandlerAdapter handlerAdapter;
private NSQConsumer consumer;
public NSQConsumers(String nsqLookupAddress, int nsqLookupPort,String topics, String channelName, NSQMessageHandlerAdapter handlerAdapter) {
this.nsqLookupAddress = nsqLookupAddress;
this.nsqLookupPort = nsqLookupPort;
this.topics = topics;
this.channelName = channelName;
this.handlerAdapter = handlerAdapter;
start();
}
public void start() {
try {
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress(nsqLookupAddress, nsqLookupPort);
consumer = new NSQConsumer(lookup, topics, channelName, (message) -> {
handlerAdapter.handleMessage(message.getMessage());
message.finished();
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void stop() {
if (consumer != null) {
consumer.shutdown();
}
}
}

View File

@ -0,0 +1,6 @@
package com.sqx.nsqChannel.config;
public abstract class NSQMessageHandlerAdapter {
public abstract void handleMessage(byte[] message);
}

View File

@ -8,6 +8,10 @@ pay:
server:
port: 8100
nsq:
address: 47.122.26.160
port: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver
# 最小空闲连接默认值10小于0或大于maximum-pool-size都会重置为maximum-pool-size

View File

@ -9,6 +9,10 @@ pay:
server:
port: 8200
nsq:
address: 47.122.26.160
port: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver
# 最小空闲连接默认值10小于0或大于maximum-pool-size都会重置为maximum-pool-size

View File

@ -8,6 +8,10 @@ pay:
server:
port: 8100
nsq:
address: 127.0.0.1
port: 4161
# 数据源的一些配置
driver-class-name: com.mysql.cj.jdbc.Driver
# 最小空闲连接默认值10小于0或大于maximum-pool-size都会重置为maximum-pool-size