From e998b145114cf82e44c68b340ffd864da0da96ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=AE=E7=AC=91=E7=9D=80=E9=9D=A2=E5=AF=B9=E6=98=8E?= =?UTF-8?q?=E5=A4=A9?= <752558143@qq.com> Date: Wed, 17 Jul 2024 22:55:06 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=20=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mes/pcn/apiservice/mqtt/PcnMqttCallback.java | 28 +++++++++++++++++++++- .../serviceimpl/base/MesEquipmentLogService.java | 2 -- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java index 1edcaaa..850ea70 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/mqtt/PcnMqttCallback.java @@ -17,10 +17,30 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Slf4j public class PcnMqttCallback implements MqttCallbackExtended { + private static ExecutorService executorService = new ThreadPoolExecutor(1, 20, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(200), r -> { + Thread thread = new Thread(r); + thread.setName("executorService--"+r.hashCode()); + return thread; + },new ThreadPoolExecutor.DiscardPolicy()); + + private static ExecutorService executorServiceTwo = new ThreadPoolExecutor(1, 20, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(200), r -> { + Thread thread = new Thread(r); + thread.setName("executorServiceTwo--"+r.hashCode()); + return thread; + },new ThreadPoolExecutor.DiscardPolicy()); + //手动注入 private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class); @@ -94,6 +114,12 @@ public class PcnMqttCallback implements MqttCallbackExtended { @Override public void messageArrived(String topic, MqttMessage mqttMessage) { log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, new String(mqttMessage.getPayload())); + executorService.execute(() -> saveData(topic, mqttMessage)); + } + + private void saveData(String topic, MqttMessage mqttMessage) { + log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},异步处理开始 消息内容: {}", topic, new String(mqttMessage.getPayload())); + try{ String resStr = new String(mqttMessage.getPayload(), "UTF-8").trim(); /** @@ -102,7 +128,7 @@ public class PcnMqttCallback implements MqttCallbackExtended { //topic1主题 List equipLogMqttMsgList = JSONObject.parseArray(resStr, EquipLogMqttMsg.class); for (EquipLogMqttMsg equipLogMqttMsg : equipLogMqttMsgList) { - equipmentLogService.updateValue(equipLogMqttMsg); + executorServiceTwo.execute(()-> equipmentLogService.updateValue(equipLogMqttMsg)); } //业务处理 //doSomething1(maps); diff --git a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java index e00fce8..d81e29d 100644 --- a/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java +++ b/modules/i3plus-ext-mes-pcn-apiservice/src/main/java/cn/estsh/i3plus/ext/mes/pcn/apiservice/serviceimpl/base/MesEquipmentLogService.java @@ -175,14 +175,12 @@ public class MesEquipmentLogService implements IMesEquipmentLogService { DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipId(), MesPcnExtConstWords.EQUIP_ID, packBean); DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipVariableId(), MesPcnExtConstWords.EQUIP_VARIABLE_ID, packBean); mesEquipmentLogRepository.updateByProperties(new String[]{MesPcnExtConstWords.EQUIP_VARIABLE_STATUS,"equipVariableValue"}, new Object[]{MesExtEnumUtil.EQUIP_VARIABLE_NEED_NEW_VALUE.TRUE.getValue(),equipLogMqttMsg.getValue() }, packBean); - LOGGER.info("messageArrived -> value修改成功"); MesEquipmentLogDetail mesEquipmentLogDetail = new MesEquipmentLogDetail(); BeanUtils.copyProperties(equipmentLog, mesEquipmentLogDetail); ConvertBean.serviceModelInitialize(mesEquipmentLogDetail, "mqtt"); mesEquipmentLogDetailRepository.saveAll(Arrays.asList(mesEquipmentLogDetail)); - LOGGER.info("messageArrived -> detail插入成功"); } }