• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

windmq:

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

windmq

开源软件地址:

https://gitee.com/sense7/windmq

开源软件介绍:

windmq - MQTT快速开发脚手架

starfork

前言

快速开发处理MQTT topic,一个方法注解就搞定

原样从项目里搬出来的,产线阿里云,测试EMQ,需要统一支持下

有些config和bean不太合理,过年有空整理下

此项目整合springboot部分和topic规则搬运了一个项目,刚接触这个,十分感谢前辈的经验https://gitee.com/yezhihao/mqtt-sample

关于共享订阅的高可用兼容,如果有方案还望各位不吝赐教

功能

  • MQTT客户端登录凭证分配(ACL支持阿里云\EMQ目前只支持账号密码,可自定义实现)
  • 适合低端设备的查表加密协议(详情见: com.stanwind.wmqtt.security.TableMsgEncrypt)
  • 高可用部署(多实例不同clientID上线,EMQ有提供共享订阅,但是阿里云只能靠规则引擎转发MQ,我们线上使用全盘负责机制,谁发命令谁处理)
  • 消息处理池(CPU核心数*2 + 1, )
  • Topic注解匹配消息处理,支持模糊匹配(正则实现,可取topic路径参数)和精确匹配

默认规则

  • 对客户端发送的TOPIC均以 IOT_CLIENT/xxx形式 (配置可修改)
  • 对服务端发送的TOPIC均以 IOT_SERVER/xxx形式 (配置可修改)
  • 加密行需在payload开头2 byte表示采用哪一行数据进行加密 (若启用加密则IOT开头的topic均会加密,详见:com.stanwind.wmqtt.security.IotDeviceMessageEncrypt)
  • 为兼容阿里云 clientId均以GID_DEVICE@@@开头 (配置可修改)
  • Server采用签名登录,阿里云环境下Client分配的账号密码使用token登录,鉴权信息有效时长12小时
  • topic中{instanceId}表示匹配当前实例ID,{deviceId}表示匹配当前设备序列号(详情: com.stanwind.wmqtt.MqttConfig)

springboot支持版本

  • 2.0.X.RELEASE

项目仓库

<dependency>  <groupId>com.stanwind</groupId>  <artifactId>spring-boot-windmq</artifactId>  <version>1.0.0-RELEASE</version></dependency>

样例工程

https://gitee.com/sense7/windmq-demo.git

参考依赖

<!-- windmq dependency --><dependency>  <groupId>com.stanwind</groupId>  <artifactId>spring-boot-windmq</artifactId>  <version>1.0.0-RELEASE</version></dependency><!-- MQTT --><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-integration</artifactId><dependency>  <groupId>org.springframework.integration</groupId>  <artifactId>spring-integration-mqtt</artifactId>  <exclusions>    <exclusion>      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>      <groupId>org.eclipse.paho</groupId>    </exclusion>  </exclusions></dependency><!-- 1.2.0 版本有bug --><dependency>  <groupId>org.eclipse.paho</groupId>  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>  <version>1.2.1</version></dependency>

启用windmq

@EnableWindMQ@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)public class DemoApplication {    public static void main(String[] args) {        SpringApplication.run(DemoApplication.class, args);    }}

使用样例

  • 临时订阅/取消(注入ProducerHolder)
void addTopic(String... topic);void addTopic(String topic, int qos);void addTopics(String[] topic, int[] qos);void removeTopic(String... topic);
  • 消息发送 IMessageService
/** * 发送消息给device 需要payload Class上有@Topic * @param deviceId * @param payload */void notify(String deviceId, Object payload);/** * 发送消息到topic * @param topic * @param payload */void notifyToTopic(String topic, Object payload);/** * 同步确认发送消息给设备 * @param deviceId * @param payload * @return */MqttResponse request(String deviceId, MqttRequest payload);/** * 同步确认发送消息给设备 * @param deviceId * @param payload * @param timeout 等待超时 ms * @return */MqttResponse request(String deviceId, MqttRequest payload, long timeout);/** * 响应同步消息 * @param message * @return */boolean response(Message<MqttResponse> message);/*** 客户端请求通用回复* @param messageId* @param deviceId* @param result*/void sendCommonResponse(Long messageId, String deviceId, Integer result);/*** 客户端请求通用回复* @param messageId* @param deviceId*/void sendCommonResponse(Long messageId, String deviceId);
  • 消息处理
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")public void connected(MQTTMsg msg) {    ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class);    process(clientReqVO);}
  • 获取路径参数
@Servicepublic class DemoHandler extends BaseTopicHandler {    @TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}/{param1}")    public void uploadPingData(MQTTMsg msg) {        String taskId = getParam("taskId");        String param1 = getParam("param1");        //或        MqttContext.getContext().getParams().getOrDefault("taskId", null);    }}
  • 高可用方案(临时订阅处理完取消 适用于服务端发送控制指令,携带临时随机topic,客户端往服务端指定topic写)
@TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}")public void uploadPingData(MQTTMsg msg) {    if (!currentHandle()) {        log.debug("非当前实例任务: [{}]", msg);        return;    }    if (接收完毕) {        //取消订阅        producerHolder.removeTopic(msg.getTopic());    }}
  • 生成客户端链接数据
@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate ClientApi clientApi;public MqttConnVO generateMqttConnConfig(String sn) throws Exception {    String r = mqttConfig.getAclRead().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));    String w = mqttConfig.getAclWrite().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));    ConnData connData = clientApi.getTokenConn(Utils.splitToList(r), Utils.splitToList(w));    MqttConnVO vo = new MqttConnVO();    //缺省外网地址则返回统一地址 否则返回外网地址    vo.setUris(ArrayUtils.isEmpty(mqttConfig.getPubServerURIs()) ? mqttConfig.getServerURIs() : mqttConfig.getPubServerURIs());    vo.setReadTopics(r);    vo.setWriteTopics(w);    vo.setEnc(mqttConfig.getEncTable());    vo.setEncSize(mqttConfig.getEncCount());    BeanUtils.copyProperties(connData, vo);    log.info("{} 获取mqtt: {}", sn, vo);    return vo;}@Datapublic class MqttConnVO implements Serializable {    private static final long serialVersionUID = 1L;    private String[] uris;    private Long expire;    private String username;    private String password;    private String readTopics;    private String writeTopics;    private String enc;    private Integer encSize;}

使用建议

  • 配置参见bootstrap.yml
  • 测试环境使用EMQ,产线使用阿里云

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap