mqtt 数据

tags/yfai-pcn-ext-v1.0
微笑着面对明天 10 months ago
parent 41d8aae13f
commit e998b14511

@ -17,10 +17,30 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class PcnMqttCallback implements MqttCallbackExtended { public class PcnMqttCallback implements MqttCallbackExtended {
private static ExecutorService executorService = new ThreadPoolExecutor(1, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200), r -> {
Thread thread = new Thread(r);
thread.setName("executorService--"+r.hashCode());
return thread;
},new ThreadPoolExecutor.DiscardPolicy());
private static ExecutorService executorServiceTwo = new ThreadPoolExecutor(1, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200), r -> {
Thread thread = new Thread(r);
thread.setName("executorServiceTwo--"+r.hashCode());
return thread;
},new ThreadPoolExecutor.DiscardPolicy());
//手动注入 //手动注入
private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class); private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class);
@ -94,6 +114,12 @@ public class PcnMqttCallback implements MqttCallbackExtended {
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) { public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, new String(mqttMessage.getPayload())); log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, new String(mqttMessage.getPayload()));
executorService.execute(() -> saveData(topic, mqttMessage));
}
private void saveData(String topic, MqttMessage mqttMessage) {
log.info("== pcnMqttCallback ==> messageArrived 接收消息主题: {},异步处理开始 消息内容: {}", topic, new String(mqttMessage.getPayload()));
try{ try{
String resStr = new String(mqttMessage.getPayload(), "UTF-8").trim(); String resStr = new String(mqttMessage.getPayload(), "UTF-8").trim();
/** /**
@ -102,7 +128,7 @@ public class PcnMqttCallback implements MqttCallbackExtended {
//topic1主题 //topic1主题
List<EquipLogMqttMsg> equipLogMqttMsgList = JSONObject.parseArray(resStr, EquipLogMqttMsg.class); List<EquipLogMqttMsg> equipLogMqttMsgList = JSONObject.parseArray(resStr, EquipLogMqttMsg.class);
for (EquipLogMqttMsg equipLogMqttMsg : equipLogMqttMsgList) { for (EquipLogMqttMsg equipLogMqttMsg : equipLogMqttMsgList) {
equipmentLogService.updateValue(equipLogMqttMsg); executorServiceTwo.execute(()-> equipmentLogService.updateValue(equipLogMqttMsg));
} }
//业务处理 //业务处理
//doSomething1(maps); //doSomething1(maps);

@ -175,14 +175,12 @@ public class MesEquipmentLogService implements IMesEquipmentLogService {
DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipId(), MesPcnExtConstWords.EQUIP_ID, packBean); DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipId(), MesPcnExtConstWords.EQUIP_ID, packBean);
DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipVariableId(), MesPcnExtConstWords.EQUIP_VARIABLE_ID, packBean); DdlPreparedPack.getNumEqualPack(equipmentLog.getEquipVariableId(), MesPcnExtConstWords.EQUIP_VARIABLE_ID, packBean);
mesEquipmentLogRepository.updateByProperties(new String[]{MesPcnExtConstWords.EQUIP_VARIABLE_STATUS,"equipVariableValue"}, new Object[]{MesExtEnumUtil.EQUIP_VARIABLE_NEED_NEW_VALUE.TRUE.getValue(),equipLogMqttMsg.getValue() }, packBean); mesEquipmentLogRepository.updateByProperties(new String[]{MesPcnExtConstWords.EQUIP_VARIABLE_STATUS,"equipVariableValue"}, new Object[]{MesExtEnumUtil.EQUIP_VARIABLE_NEED_NEW_VALUE.TRUE.getValue(),equipLogMqttMsg.getValue() }, packBean);
LOGGER.info("messageArrived -> value修改成功");
MesEquipmentLogDetail mesEquipmentLogDetail = new MesEquipmentLogDetail(); MesEquipmentLogDetail mesEquipmentLogDetail = new MesEquipmentLogDetail();
BeanUtils.copyProperties(equipmentLog, mesEquipmentLogDetail); BeanUtils.copyProperties(equipmentLog, mesEquipmentLogDetail);
ConvertBean.serviceModelInitialize(mesEquipmentLogDetail, "mqtt"); ConvertBean.serviceModelInitialize(mesEquipmentLogDetail, "mqtt");
mesEquipmentLogDetailRepository.saveAll(Arrays.asList(mesEquipmentLogDetail)); mesEquipmentLogDetailRepository.saveAll(Arrays.asList(mesEquipmentLogDetail));
LOGGER.info("messageArrived -> detail插入成功");
} }
} }

Loading…
Cancel
Save