在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:mqttx开源软件地址:https://gitee.com/amazingJun/mqttx开源软件介绍:MQTTX Project
中文 | English 1 介绍
注意:分支 关联项目: Mqttx-Client 实现&使用极为简单的 mqttv3.1.1 客户端. 1.1 快速开始
快速开始-测试模式 图例:
所谓测试模式、开发模式只是方便同学们快速启动项目,方便测试功能测试。熟悉项目后,同学们可通过修改 6.1 配置项 开启或关闭
1.2 项目依赖
其它说明:
|
Docker Pull Command | 说明 |
---|---|
docker pull fantasywujun/mqttx:1.2.0 | 基于 jdk17.0.1 的 mqttx:1.2.0 版本 |
docker-compose 文件内容:
version: "2"services: redis: container_name: redis-for-mqttx image: redis mqttx: container_name: mqttx image: fantasywujun/mqttx:1.2.0 ports: - 1883:1883 - 8083:8083
qos0 | qos1 | qos2 |
---|---|---|
支持 | 支持 | 支持 |
为支持 qos1、qos2
,引入 redis
作为持久层,这部分已经封装成接口,可自行替换实现(比如采用 mysql
)。
#
与单级通配符 +
/
结尾的topic,比如 a/b/
,请改为 a/b
。mqttx 仅对订阅 topicFilter 进行校验,publish 的 topic 是没有做合法性检查的,可通过开启 4.5 topic 安全支持 限制客户端可发布的 topic。
举例:
topicFilter | match topics |
---|---|
/a/b/+ | /a/b/abc ,/a/b/test |
a/b/# | a/b , a/b/abc , a/b/c/def |
a/+/b/# | a/nani/b/abc |
/+/a/b/+/c | /aaa/a/b/test/c |
校验工具类为:com.jun.mqttx.utils.TopicUtils
mqttx
依赖消息中间件分发消息实现集群功能,目前支持的中间件:
Kafka
:可选配置Redis
:默认配置实现原理如下图:
mqttx.cluster.enable
:功能开关,默认 false
mqttx.cluster.type
: 消息中间件类型,默认 redis
注意事项:
v1.0.5.RELEASE
之前的版本集群功能存在 bug,无法使用。
如需使用 kafka
实现集群消息,需要手动修改配置 application-*.yml
, 可参考 application-dev.yml
中的配置示例 3. kafka 集群。
测试模式开启后,集群功能 强制 关闭
开启 ssl 你首先应该有了 ca(自签名或购买),然后修改 application.yml
文件中几个配置:
mqttx.ssl.enable
:功能开关,默认 false
,同时控制 websocket
与 socket
mqttx.ssl.key-store-location
:keystore 地址,基于 classpath
mqttx.ssl.key-store-password
:keystore 密码mqttx.ssl.key-store-type
:keystore 类别,如 PKCS12
mqttx.ssl.client-auth
:服务端是否需要校验客户端证书,默认 NONE
resources/tls
目录中的mqttx.keystore
仅供测试使用, 密码:123456
证书加载工具类:
com/jun/mqttx/utils/SslUtils.java
为了对 client 订阅 topic 进行限制,加入topic 订阅&发布鉴权机制:
mqttx.enable-topic-sub-pub-secure
: 功能开关,默认 false
broker 收到 conn 报文后,会抓取 {clientId, username, password}
发起请求给 mqttx.auth.url
, 该接口返回对象中含有 authorizedSub,authorizedPub
存储 client 被授权订阅及发布的 topic
列表。
详见 4.12 基础认证支持
broker 在消息订阅及发布都会校验客户端权限
支持的主题类型:
共享订阅是 mqtt5
协议规定的内容,很多 mq(例如 kafka
) 都有实现。
mqttx.share-topic.enable
: 功能开关,默认 true
格式: $share/{ShareName}/{filter}
, $share
为前缀, ShareName
为共享订阅名, filter
就是非共享订阅主题过滤器。
目前支持 hash
, random
, round
三种规则
hash
选出的 client 会随着订阅客户端数量及发送消息客户端clientId
变化而变化
下图展示了共享主题与常规主题之间的差异:
msg-a
消息分发策略取决于配置项 mqttx.share-topic.share-sub-strategy
可以配合 cleanSession = 1
的会话,共享主题的客户端断开连接后会被服务端移除订阅,这样共享主题的消息只会分发给在线的客户端。
CleanSession 介绍:mqtt3.1.1
协议规定当 cleanSession = 1
时,连接断开后与会话相关联的所有状态(不含 retained
消息)都会被删除(mqtt5
增加了会话超时设置,感兴趣的同学可以了解一下)。mqttx v1.0.5.BETA
版本后(含),cleanSession = 1
的会话消息保存在内存中,具备极高的性能.
If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].
The Session state in the Client consists of:
- QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
- QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
The Session state in the Server consists of:
- The existence of a Session, even if the rest of the Session state is empty.
- The Client’s subscriptions.
- QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
- QoS 1 and QoS 2 messages pending transmission to the Client.
- QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
- Optionally, QoS 0 messages pending transmission to the Client.
支持
mqttx broker 内置部分系统主题,用户可酌情使用。
系统主题不支持如下特性:
注意:topic 安全机制 同样会影响客户端订阅系统主题, 未授权客户端将无法订阅系统主题
系统主题可分两种:
客户端可通过订阅系统主题获取 broker 状态,目前系统支持如下状态主题:
主题 | 描述 |
---|---|
$SYS/broker/{brokerId}/status | 触发方式:订阅此主题的客户端会定期(mqttx.sys-topic.interval )收到 broker 的状态,该状态涵盖下面所有主题的状态值. 注意:客户端连接断开后,订阅取消 |
$SYS/broker/activeConnectCount | 立即返回当前的活动连接数量 触发:订阅一次触发一次 |
$SYS/broker/time | 立即返回当前时间戳 触发:订阅一次触发一次 |
$SYS/broker/version | 立即返回 broker 版本触发:订阅一次触发一次 |
$SYS/broker/receivedMsg | 立即返回 broker 启动到现在收到的 MqttMessage , 不含 ping 触发:订阅一次触发一次 |
$SYS/broker/sendMsg | 立即返回 broker 启动到现在发送的 MqttMessage , 不含 pingAck 触发:订阅一次触发一次 |
$SYS/broker/uptime | 立即返回 broker 运行时长,单位秒触发:订阅一次触发一次 |
$SYS/broker/maxActiveConnectCount | 立即返回 broker 运行至今的最大 tcp 连接数触发:订阅一次触发一次 |
系统主题 $SYS/broker/{brokerId}/status
中的 brokerId 为配置项参数(见 6.1 配置项),可通过携带通配符的主题 $SYS/broker/+/status
订阅。
响应对象格式为 json
字符串:
{ "activeConnectCount": 1, "maxActiveConnectCount": 2, "receivedMsg": 6, "sendMsg": 77, "timestamp": "2021-03-23T23:05:37.035", "uptime": 149, "version": "1.0.7.RELEASE"}
field | 说明 |
---|---|
activeConnectCount | 当前活跃连接数量 |
maxActiveConnectCount | 最大活跃连接数量 |
receiveMsg | 收到消息数量,不含 ping |
sendMsg | 发送消息数量,不含 pingAck |
timestamp | 时间戳;(yyyy-MM-dd HH:mm:ss ) |
uptime | broker 上线时长,单位秒 |
version | mqttx 版本 |
此功能需求源自 issue: 监听MQTT客户端状态(在线、离线) · Issue #8 · Amazingwujun/mqttx (github.com)
主题 | 描述 |
---|---|
$SYS/broker/{borkerId}/clients/{clientId}/connected | 客户端上线通知主题 触发:当某个客户端上线后,broker 会发送消息给该主题 |
$SYS/broker/{borkerId}/clients/{clientId}/disconnected | 客户端下线通知主题 触发:当某个客户端掉线后,broker 会发送消息给该主题 |
这两个系统主题支持通配符,举例:
$SYS/broker/+/clients/#
: 匹配客户端上下线通知主题$SYS/broker/+/clients/+/connected
: 匹配客户端上线通知主题$SYS/broker/+/clients/+/disconnected
: 匹配客户端下线通知主题支持消息中间件:
消息桥接功能可方便的对接消息队列中间。
mqttx.message-bridge.enable
:开启消息桥接功能mqttx.bridge-topics
:需要桥接消息的主题,主题必须符合 kafka 对 topic 的要求mqttx
收到客户端 发布 的消息后,先判断桥接功能是否开启,然后再判断主题是否是需要桥接消息的主题,最后发布消息到 MQ。
仅支持单向桥接:device(client) => mqttx => MQ
使用基于令牌桶算法的 com.jun.mqttx.utils.RateLimiter
对指定主题进行流量限制。
令牌桶算法参见:https://stripe.com/blog/rate-limiters
简单解释一下令牌桶概念:有一个最大容量为
capacity
的令牌桶,该桶以一定的速率补充令牌(replenish-rate
),每次调用接口时消耗一定量(token-consumed-per-acquire
)的令牌,令牌数目足够则请求通过。
主题限流仅适用于 qos
等于 0 的消息。
配置举例:
mqttx: rate-limiter: enable: true topic-rate-limits: # 例一 - topic: "/test/a" capacity: 9 replenish-rate: 4 token-consumed-per-acquire: 3 # 例二 - topic: "/test/b" capacity: 5 replenish-rate: 5 token-consumed-per-acquire: 2
capacity
: 桶容量replenish-rate
: 令牌填充速率token-consumed-per-acquire
: 每次请求消耗令牌数量QPS
计算公式:
QPS = capacity ÷ token-consumed-per-acquire
9 ÷ 3 = 3
5 ÷ 2 = 2.5
QPS = replenish-rate ÷ token-consumed-per-acquire
4 ÷ 3 ≈ 1.3
5 ÷ 2 = 2.5
mqttx
的持久化依赖 redis
, mqttx
会持久化 cleanSession = false & qos > 0
的消息, 消息被 Serializer
序列化为字节数组后存储在 redis
。
目前 mqttx
提供了两种序列化实现:
JsonSerializer
KryoSerializer
默认使用 JsonSerializer
, 这是为了和之前的项目兼容;v1.0.6.release
版本后 KryoSerializer
将成为默认序列化实现。
可通过配置 mqttx.serialize-strategy
修改序列化实现。
mqttx
提供基础客户端认证服务。
配置项:
mqttx.auth.url
: 提供认证服务的接口地址。mqttx.auth.readTimeout
: OkHttpClient
readTimeoutmqttx.auth.connectTimeout
: OkHttpClient
connectTimeout用户在配置文件中声明 mqtt.auth.url
后,对象 com.jun.mqttx.service.impl.DefaultAuthenticationServiceImpl
使用 OkHttpClient
发出 POST
请求给 mqttx.auth.url
。
请求内容为 mqtt conn
报文中的 username, password
.
POST / HTTP/1.1Host: mqttx.auth.urlContent-Type: application/jsonContent-Length: 91{ "clientId": "device_id_test", "username": "mqttx", "password": "123456"}
认证成功后响应对象为 json
格式字符串:
{ "authorizedSub": [ "subTopic1", "subTopic2" ], "authorizedPub": [ "pubTopic1", "pubTopic2" ]}
认证成功返回响应可配合 4.5 topic 安全支持 使用。
注意:
http status = 200
即表明认证成功, 其它状态值一律为认证失败v1.0
版本分支将作为支持 mqttv3.1.1 协议版本持续迭代
一旦 OpenJDK: Loom (java.net) release,v1.0
版本将不再维护,替代版本为 v1.2
为使 mqttx 项目变得更好,请使用及学习该项目的同学主动反馈使用情况给我(提 issue 或加群反馈)
后续工作
v1.0.7.RELEASE
版本 Benchmarkv1.0.8.RELEASE
版本开发v1.1.0.RELEASE
版本开发v1.2.0.RELEASE
版本开发v2.0.0.RELEASE
版本开发v1.2
版本由 JDK8 升级至 JDK17
v2.0
版本分支将作为 mqttv5 协议版本开始迭代
这段时间工作任务繁重,功能迭代暂时停止,当然 bug 我还是会优先处理
请发表评论