添加了线程池和消息队列的交换机机制

yun-zuoyi
alwaysfrin 6 years ago
parent e2d315c161
commit 6f4f22dc1c

@ -0,0 +1,24 @@
package cn.estsh.i3plus.core.apiservice.configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* @Description :
* @Reference :
* @Author : alwaysfrin
* @CreateDate : 2019-01-10 16:53
* @Modify:
**/
@Configuration
public class MqConfig {
@Resource(name="rabbitConnectionFactory")
private ConnectionFactory connectionFactory;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
}

@ -1,14 +1,20 @@
package cn.estsh.i3plus.core.apiservice.controller;
import cn.estsh.i3plus.core.api.iservice.busi.ISysRoleService;
import cn.estsh.i3plus.core.apiservice.mq.I3CoreQueueConfig;
import cn.estsh.i3plus.core.apiservice.thread.CoreDemoThread;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.platform.common.util.QueueConstWords;
import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SessionUser;
import cn.estsh.i3plus.pojo.platform.bean.SysRole;
import cn.estsh.impp.framework.base.controller.CoreBaseController;
import cn.estsh.impp.framework.boot.auth.AuthUtil;
import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder;
import cn.estsh.impp.framework.boot.exception.ImppExceptionEnum;
import cn.estsh.impp.framework.boot.thread.ImppThreadPool;
import cn.estsh.impp.framework.boot.util.ImppRedis;
import cn.estsh.impp.framework.boot.util.ResultBean;
import com.rabbitmq.client.Channel;
@ -16,6 +22,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -28,6 +35,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@ -41,7 +49,7 @@ import java.util.concurrent.TimeoutException;
**/
@RestController
@Api(description="复杂对象服务demo")
@RequestMapping(PlatformConstWords.BASE_URL + "/demo-redis-mq")
@RequestMapping("/demo-redis-mq")
public class DemoRedisMqController extends CoreBaseController{
private static final Logger LOGGER = LoggerFactory.getLogger(DemoRedisMqController.class);
@ -60,6 +68,9 @@ public class DemoRedisMqController extends CoreBaseController{
private RabbitTemplate rabbitTemplate;
@Autowired
private ISysRoleService sysRoleService;
@Autowired
private RabbitListenerEndpointRegistry rabbitRegistry;
@Autowired
@ -99,14 +110,83 @@ public class DemoRedisMqController extends CoreBaseController{
@GetMapping(value="/get-cache")
@ApiOperation(value="缓存",notes="获取数据")
public ResultBean getCache(){
//MachineFactory mf = (MachineFactory) redisCore.getObject("machineFactory");
//System.out.println("1===== " + mf);
System.out.println("2===== " + redisCore.getObject("wms"));
return new ResultBean(true);
}
/*@GetMapping(value="/send-mq")
@GetMapping(value="/exception-demo")
@ApiOperation(value="异常demo",notes="异常demo")
public ResultBean exceptionDemp() throws IOException, TimeoutException {
String data = "true";
if("true".equals(data)) {
throw ImppExceptionBuilder.newInstance()
.setSystemID(CommonEnumUtil.SOFT_TYPE.WMS.getCode())
.setErrorCode(ImppExceptionEnum.BUSINESS_EXCEPTION_DATA_ERROR.getCode())
.setErrorDetail("aaa操作数据出错")
.setErrorSolution("检查数据完整性")
.build();
}
return new ResultBean(true,"操作成功");
}
@GetMapping(value="/international")
@ApiOperation(value="国际化处理",notes="资源配置文件及国际化")
public ResultBean international() {
Locale locale = LocaleContextHolder.getLocale();
System.out.println(locale.getLanguage() + " " + locale.getCountry());
return new ResultBean(true,"操作成功 " + locale.getLanguage() + " " + locale.getCountry());
}
@Resource(name="redisSession")
private ImppRedis redisSession;
@GetMapping(value="/set-redis")
@ApiOperation(value="setredis")
public ResultBean setRedis(String key,String value) {
//redisSession.putObject(key,value);
//redisCore.putObject(key,value);
ResultBean rs = new ResultBean(true,"yes");
rs.setPager(new Pager(100));
redisSession.putObject("rs",rs);
redisCore.putObject("rs",rs);
return new ResultBean(true,"操作成功 " + key + " : " + value);
}
@GetMapping(value="/get-redis")
@ApiOperation(value="getredis")
public ResultBean getRedis(String key) {
Object d1 = redisSession.getObject(key);
System.out.println("redisSession-d1==" + d1);
Object d2 = redisCore.getObject(key);
if(d1 != null) {
SessionUser sessionUser = AuthUtil.getSessionUser();
System.out.println(sessionUser.toString());
}
System.out.println("redisCore-d2==" + d2);
System.out.println(d1 == null?"null":d1.toString());
System.out.println(d2 == null?"null":d2.toString());
return new ResultBean(true,"操作成功 " + d1 + " : " + d2);
}
@GetMapping(value="/test-thread")
@ApiOperation(value="testThread")
public ResultBean testThread(String param) {
ImppThreadPool.getThreadExcutorService().execute(new CoreDemoThread(param));
return new ResultBean(true,"操作成功");
}
@GetMapping(value="/send-mq")
@ApiOperation(value="队列",notes="发送")
public ResultBean sendMQ(String data){
String context = "hello : " + data;
@ -121,9 +201,9 @@ public class DemoRedisMqController extends CoreBaseController{
//this.rabbitTemplate.convertAndSend(I3CoreQueueConfig.DEMO_OBJ_QUEUE, new MachineFactory("111mq","rabbit"));
return new ResultBean(true,"操作成功");
}*/
}
/*@GetMapping(value="/get-mq-handle")
@GetMapping(value="/get-mq-handle")
@ApiOperation(value="手动获取队列",notes="接收队列")
public ResultBean getHandleMQ(){
Object data = this.rabbitTemplate.receiveAndConvert(I3CoreQueueConfig.DEMO_HANDLE_QUEUE);
@ -142,7 +222,7 @@ public class DemoRedisMqController extends CoreBaseController{
}
return new ResultBean(true,"");
}*/
}
/**
*
@ -152,7 +232,7 @@ public class DemoRedisMqController extends CoreBaseController{
* @throws IOException
* @throws TimeoutException
*/
/*@GetMapping(value="/send-returnmsg")
@GetMapping(value="/send-returnmsg")
@ApiOperation(value="队列返回信息",notes="发送")
public ResultBean sendReturnMQ(int type,String data) throws IOException, TimeoutException {
if(type == 1) {
@ -175,7 +255,7 @@ public class DemoRedisMqController extends CoreBaseController{
String returnMsg = (String) rabbitTemplate.convertSendAndReceive(I3CoreQueueConfig.DEMO_RETURN_QUEUE, data);
System.out.println("===返回数据==="+returnMsg);
return new ResultBean(true,"操作成功");
}*/
}
/**
* ackRabbitListener
@ -183,7 +263,7 @@ public class DemoRedisMqController extends CoreBaseController{
* @throws IOException
* @throws TimeoutException
*/
/*@GetMapping(value="/send-ackmsg")
@GetMapping(value="/send-ackmsg")
@ApiOperation(value="ack队列",notes="发送")
public ResultBean sendAckMQ(String data) throws IOException, TimeoutException {
System.out.println("发送ack数据 : " + data);
@ -196,20 +276,20 @@ public class DemoRedisMqController extends CoreBaseController{
@ApiOperation(value="getack队列",notes="接收")
public ResultBean getAckMQ() throws IOException, TimeoutException {
Channel channel = this.rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
System.out.println("1====" + channel);
LOGGER.info("channel = {}",channel);
String str = (String) rabbitTemplate.receiveAndConvert(I3CoreQueueConfig.DEMO_ACK_QUEUE);
System.out.println("2=============="+str);
LOGGER.info("str = {}",str);
Message message = rabbitTemplate.receive(I3CoreQueueConfig.DEMO_ACK_QUEUE);
System.out.println("3=============="+message);
LOGGER.info("message = {}",message);
try {
*//*String data = (String) this.rabbitTemplate.receiveAndConvert(I3CoreQueueConfig.DEMO_ACK_QUEUE);
/*String data = (String) this.rabbitTemplate.receiveAndConvert(I3CoreQueueConfig.DEMO_ACK_QUEUE);
System.out.println("【client】数据接收成功" + data);
if("ack".equals(data)){
System.out.println("【client】数据抛出异常");
throw new RuntimeException("【队列抛出异常】" + data);
}*//*
}*/
String data = (String) this.rabbitTemplate.receiveAndConvert(I3CoreQueueConfig.DEMO_ACK_QUEUE);
System.out.println("【client】数据接收成功" + data);
@ -233,66 +313,44 @@ public class DemoRedisMqController extends CoreBaseController{
}
}
return new ResultBean(true,"操作成功");
}*/
@GetMapping(value="/exception-demo")
@ApiOperation(value="异常demo",notes="异常demo")
public ResultBean exceptionDemp() throws IOException, TimeoutException {
String data = "true";
if("true".equals(data)) {
throw ImppExceptionBuilder.newInstance()
.setSystemID(CommonEnumUtil.SOFT_TYPE.WMS.getCode())
.setErrorCode(ImppExceptionEnum.BUSINESS_EXCEPTION_DATA_ERROR.getCode())
.setErrorDetail("aaa操作数据出错")
.setErrorSolution("检查数据完整性")
.build();
}
return new ResultBean(true,"操作成功");
}
@GetMapping(value="/international")
@ApiOperation(value="国际化处理",notes="资源配置文件及国际化")
public ResultBean international() {
Locale locale = LocaleContextHolder.getLocale();
System.out.println(locale.getLanguage() + " " + locale.getCountry());
return new ResultBean(true,"操作成功 " + locale.getLanguage() + " " + locale.getCountry());
}
@Resource(name="redisSession")
private ImppRedis redisSession;
@GetMapping(value="/set-redis")
@ApiOperation(value="setredis")
public ResultBean setRedis(String key,String value) {
//redisSession.putObject(key,value);
//redisCore.putObject(key,value);
ResultBean rs = new ResultBean(true,"yes");
rs.setPager(new Pager(100));
redisSession.putObject("rs",rs);
redisCore.putObject("rs",rs);
/**
* mq
* @return
* @throws IOException
* @throws TimeoutException
*/
@GetMapping(value="/send-mq-role")
@ApiOperation(value="发送角色信息队列",notes="发送")
public ResultBean sendRoleMQ(int sendCount) throws IOException, TimeoutException {
List<SysRole> roleList = sysRoleService.findSysRoleAll();
LOGGER.info("共有角色数:{}",roleList.size());
//发送信息至通用交换机-》扇形交换
SysRole sysRole;
for(int i=0;i<sendCount;i++) {
sysRole = roleList.get(i);
LOGGER.info("{}次发送角色:{}",i,sysRole);
this.rabbitTemplate.convertAndSend(QueueConstWords.QUEUE_EXCHANGE_COMMON,"", sysRole);
}
return new ResultBean(true,"操作成功 " + key + " : " + value);
return new ResultBean(true,"操作成功");
}
@GetMapping(value="/get-redis")
@ApiOperation(value="getredis")
public ResultBean getRedis(String key) {
Object d1 = redisSession.getObject(key);
System.out.println("redisSession-d1==" + d1);
Object d2 = redisCore.getObject(key);
if(d1 != null) {
SessionUser sessionUser = AuthUtil.getSessionUser();
System.out.println(sessionUser.toString());
}
/**
* mq
* @return
* @throws IOException
* @throws TimeoutException
*/
@GetMapping(value="/send-mq-role-direct")
@ApiOperation(value="发送角色信息队列",notes="发送")
public ResultBean sendRoleMQDirect(String routeKey) throws IOException, TimeoutException {
//通过routekey来控制接收的队列
this.rabbitTemplate.convertAndSend(QueueConstWords.QUEUE_EXCHANGE_DIRECT,routeKey, "发送信息=key" + routeKey);
System.out.println("redisCore-d2==" + d2);
System.out.println(d1 == null?"null":d1.toString());
System.out.println(d2 == null?"null":d2.toString());
return new ResultBean(true,"操作成功 " + d1 + " : " + d2);
return new ResultBean(true,"操作成功");
}
}

@ -0,0 +1,87 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.platform.common.util.QueueConstWords;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Description :
* @Reference :
* @Author : alwaysfrin
* @CreateDate : 2019-01-14 17:01
* @Modify:
**/
@Component
public class CommonQueueReceive {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonQueueReceive.class);
/**
*
* @param channel
* @param message
*/
@RabbitListener(bindings ={
//绑定队列
@QueueBinding(value = @Queue(value = QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON),
//绑定交换
exchange = @Exchange(type=ExchangeTypes.FANOUT,value = QueueConstWords.QUEUE_EXCHANGE_COMMON))
})
public void commonQueue(Object object, Channel channel, Message message){
try {
LOGGER.info("【{}】数据接收成功:{}",QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON,object);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
LOGGER.error("【{}】处理出错,丢弃:{}",QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON,e.getMessage(),e);
//丢弃这条消息
try {
// 未成功处理,重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
/**
*
* @param channel
* @param message
*/
@RabbitListener(bindings ={
//绑定队列
@QueueBinding(value = @Queue(value = QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.DIRECT_EXCHANGE_KEY_ROLE),
//绑定交换
exchange = @Exchange(type=ExchangeTypes.DIRECT,value = QueueConstWords.QUEUE_EXCHANGE_DIRECT),
key = QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.DIRECT_EXCHANGE_KEY_ROLE)
})
public void directQueue(Object object, Channel channel, Message message){
try {
LOGGER.info("【{}】数据接收成功:{}",QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON,object);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
LOGGER.error("【{}】处理出错,丢弃:{}",QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON,e.getMessage(),e);
//丢弃这条消息
try {
// 未成功处理,重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

@ -1,8 +1,9 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -26,10 +27,9 @@ public class I3CoreQueueConfig {
return new Queue(IMPP_MESSAGE_QUEUE);
}
/*********** 队列demo ***********/
/*public static final String DEMO_STR_QUEUE = "demo_str_queue";
public static final String DEMO_STR_QUEUE = "demo_str_queue";
@Bean
public Queue getStrQueue() {
//LOGGER.info("【DEMO_STR_QUEUE队列】");
@ -62,5 +62,5 @@ public class I3CoreQueueConfig {
public Queue getReturnQueue() throws Exception {
//LOGGER.info("【DEMO_RETURN_QUEUE队列】");
return new Queue(DEMO_RETURN_QUEUE);
}*/
}
}

@ -0,0 +1,50 @@
package cn.estsh.i3plus.core.apiservice.thread;
import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
import cn.estsh.i3plus.icloud.core.sdk.ICorePersonnelCloud;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.impp.framework.boot.thread.BaseThread;
import cn.estsh.impp.framework.boot.thread.ImppThreadPool;
import cn.estsh.impp.framework.boot.util.ServletContextUtil;
import cn.estsh.impp.framework.boot.util.SpringContextsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
/**
* @Description : 线
* @Reference :
* @Author : alwaysfrin
* @CreateDate : 2019-01-14 15:26
* @Modify:
**/
public class CoreDemoThread extends BaseThread {
private static final Logger LOGGER = LoggerFactory.getLogger(CoreDemoThread.class);
private String param;
public CoreDemoThread(String param){
this.param = param;
IPersonnelService personnelService = (IPersonnelService) SpringContextsUtil.getBean(IPersonnelService.class);
LOGGER.info("spring容器获取服务{}" + personnelService);
WebApplicationContext webAppContext = WebApplicationContextUtils.getWebApplicationContext(ServletContextUtil.getServletContext());
ISysMessageService sysMessageService = webAppContext.getBean(ISysMessageService.class);
LOGGER.info("servlet容器获取服务{}" + sysMessageService);
}
@Override
public boolean doRun() {
//处理业务逻辑
System.out.println("线程执行:获取参数" + param);
return true;
}
//使用方式
public static void main(String[] args){
ImppThreadPool.getThreadExcutorService().execute(new CoreDemoThread("thread param"));
}
}

@ -35,12 +35,12 @@ spring.resources.static-locations=/static/**,/**
#app基础包路径
impp.app.base-packages=cn.estsh.i3plus.core
#app对象路径
impp.app.pojo-packages=cn.estsh.i3plus.pojo
impp.app.pojo-packages=cn.estsh.i3plus.pojo.platform
#dao接口包
impp.app.pojo-dao=${impp.app.base-packages}.apiservice.dao
#mongo-dao接口包
impp.app.pojo-mongo-dao=${impp.app.base-packages}.**.apiservice.daomongo
#对象持久化包路径,可以多个包,逗号分隔
impp.app.pojo-repository=${impp.app.pojo-packages}.**.repository
#mongodb资源仓
impp.app.pojo-mongo-repository=${impp.app.pojo-packages}.**.repositorymongo
#mongodb资源仓go
impp.app.pojo-mongo-repository=${impp.app.pojo-packages}.**.repositorymon
Loading…
Cancel
Save