在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:windmq开源软件地址:https://gitee.com/sense7/windmq开源软件介绍:windmq - MQTT快速开发脚手架前言快速开发处理MQTT topic,一个方法注解就搞定 原样从项目里搬出来的,产线阿里云,测试EMQ,需要统一支持下 有些config和bean不太合理,过年有空整理下 此项目整合springboot部分和topic规则搬运了一个项目,刚接触这个,十分感谢前辈的经验https://gitee.com/yezhihao/mqtt-sample 关于共享订阅的高可用兼容,如果有方案还望各位不吝赐教 功能
默认规则
springboot支持版本
项目仓库<dependency> <groupId>com.stanwind</groupId> <artifactId>spring-boot-windmq</artifactId> <version>1.0.0-RELEASE</version></dependency> 样例工程https://gitee.com/sense7/windmq-demo.git 参考依赖<!-- windmq dependency --><dependency> <groupId>com.stanwind</groupId> <artifactId>spring-boot-windmq</artifactId> <version>1.0.0-RELEASE</version></dependency><!-- MQTT --><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId><dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <exclusions> <exclusion> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <groupId>org.eclipse.paho</groupId> </exclusion> </exclusions></dependency><!-- 1.2.0 版本有bug --><dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version></dependency> 启用windmq@EnableWindMQ@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }} 使用样例
void addTopic(String... topic);void addTopic(String topic, int qos);void addTopics(String[] topic, int[] qos);void removeTopic(String... topic);
/** * 发送消息给device 需要payload Class上有@Topic * @param deviceId * @param payload */void notify(String deviceId, Object payload);/** * 发送消息到topic * @param topic * @param payload */void notifyToTopic(String topic, Object payload);/** * 同步确认发送消息给设备 * @param deviceId * @param payload * @return */MqttResponse request(String deviceId, MqttRequest payload);/** * 同步确认发送消息给设备 * @param deviceId * @param payload * @param timeout 等待超时 ms * @return */MqttResponse request(String deviceId, MqttRequest payload, long timeout);/** * 响应同步消息 * @param message * @return */boolean response(Message<MqttResponse> message);/*** 客户端请求通用回复* @param messageId* @param deviceId* @param result*/void sendCommonResponse(Long messageId, String deviceId, Integer result);/*** 客户端请求通用回复* @param messageId* @param deviceId*/void sendCommonResponse(Long messageId, String deviceId);
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")public void connected(MQTTMsg msg) { ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class); process(clientReqVO);}
@Servicepublic class DemoHandler extends BaseTopicHandler { @TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}/{param1}") public void uploadPingData(MQTTMsg msg) { String taskId = getParam("taskId"); String param1 = getParam("param1"); //或 MqttContext.getContext().getParams().getOrDefault("taskId", null); }}
@TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}")public void uploadPingData(MQTTMsg msg) { if (!currentHandle()) { log.debug("非当前实例任务: [{}]", msg); return; } if (接收完毕) { //取消订阅 producerHolder.removeTopic(msg.getTopic()); }}
@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate ClientApi clientApi;public MqttConnVO generateMqttConnConfig(String sn) throws Exception { String r = mqttConfig.getAclRead().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn)); String w = mqttConfig.getAclWrite().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn)); ConnData connData = clientApi.getTokenConn(Utils.splitToList(r), Utils.splitToList(w)); MqttConnVO vo = new MqttConnVO(); //缺省外网地址则返回统一地址 否则返回外网地址 vo.setUris(ArrayUtils.isEmpty(mqttConfig.getPubServerURIs()) ? mqttConfig.getServerURIs() : mqttConfig.getPubServerURIs()); vo.setReadTopics(r); vo.setWriteTopics(w); vo.setEnc(mqttConfig.getEncTable()); vo.setEncSize(mqttConfig.getEncCount()); BeanUtils.copyProperties(connData, vo); log.info("{} 获取mqtt: {}", sn, vo); return vo;}@Datapublic class MqttConnVO implements Serializable { private static final long serialVersionUID = 1L; private String[] uris; private Long expire; private String username; private String password; private String readTopics; private String writeTopics; private String enc; private Integer encSize;} 使用建议
|
请发表评论