• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

mqtt-client: mqtt-client provides an ASL 2.0 licensed API to MQTT. It takes care ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

mqtt-client

开源软件地址:

https://gitee.com/iots/mqtt-client

开源软件介绍:

Overview

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivityprotocol. It was designed as an extremely lightweight publish/subscribemessaging transport. It is useful for connections with remote locations wherea small code footprint is required and/or network bandwidth is at a premium.

mqtt-client provides an ASL 2.0 licensed API to MQTT. It takes care ofautomatically reconnecting to your MQTT server and restoring your clientsession if any network failures occur. Applications can use a blocking APIstyle, a futures based API, or a callback/continuations passing API style.

Using from Maven

Add the following to your maven pom.xml file.

<dependency>  <groupId>org.fusesource.mqtt-client</groupId>  <artifactId>mqtt-client</artifactId>  <version>1.12</version></dependency>

Using from Gradle

Add the following to your gradle file.

compile 'org.fusesource.mqtt-client:mqtt-client:1.12'

Using from any Other Build System

Download theuber jar fileand add it to your build. The uber contains all the stripped down dependencieswhich the mqtt-client depends on from other projects.

Using on Java 1.4

We also provide anjava 1.4 uber jar filewhich is compatible with Java 1.4 JVMs. This version of the jardoes not support SSL connections since the SSLEngine class used to implement SSL on NIOwas not introduced until Java 1.5.

Configuring the MQTT Connection

The blocking, future, and callback APIs all share the same connection setup.You create a new instance of the MQTT class and configure it with connectionand socket related options. At a minimum the setHost method be called beforeattempting to connect.

MQTT mqtt = new MQTT();mqtt.setHost("localhost", 1883);// or mqtt.setHost("tcp://localhost:1883");

Controlling MQTT Options

  • setClientId : Use to set the client Id of the session. This is what an MQTT serveruses to identify a session where setCleanSession(false); is being used. The id must be23 characters or less. Defaults to auto generated id (based on your socket address, portand timestamp).

  • setCleanSession : Set to false if you want the MQTT server to persist topic subscriptionsand ack positions across client sessions. Defaults to true.

  • setKeepAlive : Configures the Keep Alive timer in seconds. Defines the maximum timeinterval between messages received from a client. It enables the server to detect that thenetwork connection to a client has dropped, without having to wait for the long TCP/IP timeout.

  • setUserName : Sets the user name used to authenticate against the server.

  • setPassword : Sets the password used to authenticate against the server.

  • setWillTopic: If set the server will publish the client's Willmessage to the specified topics if the client has an unexpecteddisconnection.

  • setWillMessage: The Will message to send. Defaults to a zero length message.

  • setWillQos : Sets the quality of service to use for the Will message. Defaultsto QoS.AT_MOST_ONCE.

  • setWillRetain: Set to true if you want the Will to be published with the retainoption.

  • setVersion: Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the3.1 protocol version.

Controlling Connection Reconnects

Connection will automatically reconnect and re-establish messaging sessionif any network error occurs. You can control how often the reconnectis attempted and define maximum number of attempts of reconnects usingthe following methods:

  • setConnectAttemptsMax : The maximum number of reconnect attempts before an erroris reported back to the client on the first attempt by the client to connect to a server. Setto -1 to use unlimited attempts. Defaults to -1.
  • setReconnectAttemptsMax : The maximum number of reconnect attempts before an erroris reported back to the client after a server connection had previously been established. Setto -1 to use unlimited attempts. Defaults to -1.
  • setReconnectDelay : How long to wait in ms before the first reconnectattempt. Defaults to 10.
  • setReconnectDelayMax : The maximum amount of time in ms to wait betweenreconnect attempts. Defaults to 30,000.
  • setReconnectBackOffMultiplier : The Exponential backoff be used between reconnectattempts. Set to 1 to disable exponential backoff. Defaults to 2.

Configuring Socket Options

You can adjust some socket options by using the following methods:

  • setReceiveBufferSize : Sets the size of the internal socket receivebuffer. Defaults to 65536 (64k)

  • setSendBufferSize : Sets the size of the internal socket send buffer.
    Defaults to 65536 (64k)

  • setTrafficClass : Sets traffic class or type-of-service octet in the IPheader for packets sent from the transport. Defaults to 8 whichmeans the traffic should be optimized for throughput.

Throttling Connections

If you want slow down the read or write rate of your connections, usethe following methods:

  • setMaxReadRate : Sets the maximum bytes per second that this transport willreceive data at. This setting throttles reads so that the rate is not exceeded.Defaults to 0 which disables throttling.

  • setMaxWriteRate : Sets the maximum bytes per second that this transport willsend data at. This setting throttles writes so that the rate is not exceeded.Defaults to 0 which disables throttling.

Using SSL connections

If you want to connect over SSL/TLS instead of TCP, use an "ssl://" or"tls://" URI prefix instead of "tcp://" for the host field. For finergrained control of which algorithm is used. Supported protocol values are:

  • ssl:// - Use the JVM default version of the SSL algorithm.
  • sslv*:// - Use a specific SSL version where * is a versionsupported by your JVM. Example: sslv3
  • tls:// - Use the JVM default version of the TLS algorithm.
  • tlsv*:// - Use a specific TLS version where * is a versionsupported by your JVM. Example: tlsv1.1

The client will use the default JVM SSLContext which is configured via JVMsystem properties unless you configure the MQTT instance using thesetSslContext method.

SSL connections perform blocking operations against internal thread poolunless you call the setBlockingExecutor method to configure that executorthey will use instead.

Selecting the Dispatch Queue

A HawtDispatch dispatch queue is usedto synchronize access to the connection. If an explicit queue is notconfigured via the setDispatchQueue method, then a new queue will be createdfor the connection. Setting an explicit queue might be handy if you wantmultiple connection to share the same queue for synchronization.

Using the Blocking API

The MQTT.connectBlocking method establishes a connection and provides you a connectionwith an blocking API.

BlockingConnection connection = mqtt.blockingConnection();connection.connect();

Publish messages to a topic using the publish method:

connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

You can subscribe to multiple topics using the the subscribe method:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};byte[] qoses = connection.subscribe(topics);

Then receive and acknowledge consumption of messages using the receive, and ackmethods:

Message message = connection.receive();System.out.println(message.getTopic());byte[] payload = message.getPayload();// process the message then:message.ack();

Finally to disconnect:

connection.disconnect();

Using the Future based API

The MQTT.connectFuture method establishes a connection and provides you a connectionwith an futures style API. All operations against the connection are non-blocking andreturn the result via a Future.

FutureConnection connection = mqtt.futureConnection();Future<Void> f1 = connection.connect();f1.await();Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});byte[] qoses = f2.await();// We can start future receive..Future<Message> receive = connection.receive();// send the message..Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);// Then the receive will get the message.Message message = receive.await();message.ack();Future<Void> f4 = connection.disconnect();f4.await();

Using the Callback/Continuation Passing based API

The MQTT.connectCallback method establishes a connection and provides you a connection withan callback style API. This is the most complex to use API style, but can provide the bestperformance. The future and blocking APIs use the callback api under the covers. Alloperations on the connection are non-blocking and results of an operation are passed tocallback interfaces you implement.

Example:

final CallbackConnection connection = mqtt.callbackConnection();connection.listener(new Listener() {      public void onDisconnected() {    }    public void onConnected() {    }    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {        // You can now process a received message from a topic.        // Once process execute the ack runnable.        ack.run();    }    public void onFailure(Throwable value) {        connection.close(null); // a connection failure occured.    }})connection.connect(new Callback<Void>() {    public void onFailure(Throwable value) {        result.failure(value); // If we could not connect to the server.    }    // Once we connect..    public void onSuccess(Void v) {            // Subscribe to a topic        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};        connection.subscribe(topics, new Callback<byte[]>() {            public void onSuccess(byte[] qoses) {                // The result of the subcribe request.            }            public void onFailure(Throwable value) {                connection.close(null); // subscribe failed.            }        });        // Send a message to a topic        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {            public void onSuccess(Void v) {              // the pubish operation completed successfully.            }            public void onFailure(Throwable value) {                connection.close(null); // publish failed.            }        });                // To disconnect..        connection.disconnect(new Callback<Void>() {            public void onSuccess(Void v) {              // called once the connection is disconnected.            }            public void onFailure(Throwable value) {              // Disconnects never fail.            }        });    }});

Every connection has a HawtDispatch dispatch queuewhich it uses to process IO events for the socket. The dispatch queue is an Executor thatprovides serial execution of IO and processing events and is used to ensure synchronizedaccess of connection.

The callbacks will be executing the dispatch queue associated with the connection soit safe to use the connection from the callback but you MUST NOT perform any blockingoperations within the callback. If you need to perform some processing which MAY block, youmust send it to another thread pool for processing. Furthermore, if another thread needs tointeract with the connection it can only do it by using a Runnable submitted to theconnection's dispatch queue.

Example of executing a Runnable on the connection's dispatch queue:

connection.getDispatchQueue().execute(new Runnable(){    public void run() {      connection.publish( ..... );    }});

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap