This commit is contained in:
GYJ 2025-01-09 14:43:16 +08:00
parent 1799b5185a
commit 8a6ed19045
8 changed files with 98 additions and 19 deletions

View File

@ -23,6 +23,9 @@ import com.sqx.modules.pay.wuyou.WuyouPay;
import com.sqx.modules.sys.dao.SysUserMoneyDao;
import com.sqx.modules.sys.entity.SysUserMoney;
import com.sqx.modules.sys.service.SysUserService;
import com.sqx.nsq.SubscriberService;
import com.sqx.nsq.subscriber.NsqSubscriber;
import com.sqx.nsq.subscriber.TestNsqSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,6 +34,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@ -52,6 +56,10 @@ public class OneTimeTaskListener implements ApplicationListener<ApplicationReady
private Logger logger = LoggerFactory.getLogger(getClass());
private final CashOutDao cashOutDao;
@Resource
private SubscriberService subscriberService;
public OneTimeTaskListener(ApplicationContext applicationContext, UserService userService, TempOrdersTask tempOrdersTask, InviteAchievementService inviteAchievementService, PayDetailsService payDetailsService, PayDetailsDao payDetailsDao, UserMoneyDetailsDao userMoneyDetailsDao, UserMoneyDao userMoneyDao, OrdersService ordersService, WuyouPay wuyouPay, CashOutDao cashOutDao, SysUserMoneyDao sysUserMoneyDao) {
this.applicationContext = applicationContext;
this.userService = userService;
@ -71,6 +79,8 @@ public class OneTimeTaskListener implements ApplicationListener<ApplicationReady
public void onApplicationEvent(ApplicationReadyEvent event) {
logger.info("应用启动完成后执行一次性方法");
subscriberService.startSubscriber(new TestNsqSubscriber());
// List<CashOut> cashOutList = cashOutDao.selectList(new LambdaQueryWrapper<CashOut>()
// .eq(CashOut::getState, 2)
// .isNull(CashOut::getOutAt)

View File

@ -59,6 +59,7 @@ public class ShiroConfig {
filterMap.put("/captcha.jpg", "anon");
filterMap.put("/aaa.txt", "anon");
filterMap.put("/search/**", "anon");
filterMap.put("/nsq/**", "anon");
filterMap.put("/cashOutAudit/batchCashOutOrder", "anon");
filterMap.put("/**", "oauth2");

View File

@ -1,11 +1,13 @@
package com.sqx.nsq;
import com.sproutsocial.nsq.Publisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author GYJoker
*/
@Slf4j
@Service
public class PublisherService {
private final NsqConfig nsqConfig;
@ -20,6 +22,8 @@ public class PublisherService {
public void publishData(String topic, byte[] data) {
publisher.publish(topic, data);
log.info("Published message: {}", new String(data));
}
public static void main(String[] args) {

View File

@ -16,26 +16,25 @@ import org.springframework.stereotype.Component;
public class SubscriberService {
private final NsqConfig nsqConfig;
private final Subscriber subscriber;
private Subscriber subscriber;
public SubscriberService(NsqConfig config) {
this.nsqConfig = config;
init();
}
public void init() {
subscriber = new Subscriber(String.format("%s:%s", nsqConfig.getNsqHost(), nsqConfig.getNsqPort()));
System.out.println("SubscriberService init");
}
private void startSubscriber(NsqSubscriber nsqSubscriber) {
subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), (MessageHandler) data -> {
try {
Object o = JSONObject.parseObject(new String(data.getData()), nsqSubscriber.getDataType());
nsqSubscriber.handleMessage(o);
} catch (Exception e) {
log.error("处理消息失败", e);
}
});
public void startSubscriber(NsqSubscriber nsqSubscriber) {
subscriber.subscribe(nsqSubscriber.getTopic(), nsqSubscriber.getChannel(), nsqSubscriber);
}
public void receiveData(String topic, byte[] data) {
public void handleData2(byte[] data) {
System.out.println("Received:" + new String(data));
}

View File

@ -0,0 +1,32 @@
package com.sqx.nsq;
import com.sqx.nsq.subscriber.TestNsqSubscriber;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author GYJoker
*/
@RestController
@RequestMapping("/nsq")
public class TestController {
private final PublisherService publisherService;
private final SubscriberService subscriberService;
public TestController(PublisherService publisherService, SubscriberService subscriberService) {
this.publisherService = publisherService;
this.subscriberService = subscriberService;
init();
}
public void init() {
subscriberService.startSubscriber(new TestNsqSubscriber());
}
@RequestMapping("/sender")
public String test(String msg) {
publisherService.publishData("test", msg.getBytes());
return "test";
}
}

View File

@ -0,0 +1,11 @@
package com.sqx.nsq.message;
import lombok.Data;
/**
* @author GYJoker
*/
@Data
public class NsqMessage {
private Long id;
}

View File

@ -1,9 +1,13 @@
package com.sqx.nsq.subscriber;
import com.sproutsocial.nsq.MessageDataHandler;
import org.apache.logging.log4j.message.Message;
/**
* @author GYJoker
*/
public abstract class NsqSubscriber<T> {
public abstract class NsqSubscriber implements MessageDataHandler {
/**
* 获取订阅的主题
*/
@ -13,11 +17,4 @@ public abstract class NsqSubscriber<T> {
* 获取订阅的频道
*/
public abstract String getChannel();
public abstract Class<T> getDataType();
/**
* 处理接收到的消息
*/
public abstract void handleMessage(T message);
}

View File

@ -0,0 +1,25 @@
package com.sqx.nsq.subscriber;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.message.Message;
/**
* @author GYJoker
*/
@Slf4j
public class TestNsqSubscriber extends NsqSubscriber {
@Override
public String getTopic() {
return "test";
}
@Override
public String getChannel() {
return "channel";
}
@Override
public void accept(byte[] data) {
System.out.println("Received111:" + new String(data));
}
}