本文整理汇总了Python中paho_test.gen_connect函数的典型用法代码示例。如果您正苦于以下问题:Python gen_connect函数的具体用法?Python gen_connect怎么用?Python gen_connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了gen_connect函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_01_con_failure_rc
def test_01_con_failure_rc(self, proto_ver, fake_broker):
mqttc = client.Client(
"01-con-failure-rc", protocol=proto_ver)
def on_connect(mqttc, obj, flags, rc):
assert rc == 1
mqttc.on_connect = on_connect
mqttc.connect_async("localhost", 1888)
mqttc.loop_start()
try:
fake_broker.start()
connect_packet = paho_test.gen_connect(
"01-con-failure-rc", keepalive=60,
proto_ver=proto_ver)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet
connack_packet = paho_test.gen_connack(rc=1)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)
packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
finally:
mqttc.loop_stop()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:32,代码来源:test_client.py
示例2: test_valid_utf8_topic_recv
def test_valid_utf8_topic_recv(self, fake_broker):
mqttc = client.Client("client-id")
# It should be non-ascii multi-bytes character
topic = unicodedata.lookup('SNOWMAN')
def on_message(client, userdata, msg):
assert msg.topic == topic
client.disconnect()
mqttc.on_message = on_message
mqttc.connect_async("localhost", 1888)
mqttc.loop_start()
try:
fake_broker.start()
connect_packet = paho_test.gen_connect("client-id")
packet_in = fake_broker.receive_packet(len(connect_packet))
assert packet_in # Check connection was not closed
assert packet_in == connect_packet
connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)
publish_packet = paho_test.gen_publish(
topic.encode('utf-8'), qos=0
)
count = fake_broker.send_packet(publish_packet)
assert count # Check connection was not closed
assert count == len(publish_packet)
disconnect_packet = paho_test.gen_disconnect()
packet_in = fake_broker.receive_packet(len(disconnect_packet))
assert packet_in # Check connection was not closed
assert packet_in == disconnect_packet
finally:
mqttc.loop_stop()
packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:45,代码来源:test_client.py
示例3: test_invalid_utf8_topic
def test_invalid_utf8_topic(self, fake_broker):
mqttc = client.Client("client-id")
def on_message(client, userdata, msg):
with pytest.raises(UnicodeDecodeError):
msg.topic
client.disconnect()
mqttc.on_message = on_message
mqttc.connect_async("localhost", 1888)
mqttc.loop_start()
try:
fake_broker.start()
connect_packet = paho_test.gen_connect("client-id")
packet_in = fake_broker.receive_packet(len(connect_packet))
assert packet_in # Check connection was not closed
assert packet_in == connect_packet
connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)
publish_packet = paho_test.gen_publish(b"\xff", qos=0)
count = fake_broker.send_packet(publish_packet)
assert count # Check connection was not closed
assert count == len(publish_packet)
disconnect_packet = paho_test.gen_disconnect()
packet_in = fake_broker.receive_packet(len(disconnect_packet))
assert packet_in # Check connection was not closed
assert packet_in == disconnect_packet
finally:
mqttc.loop_stop()
packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:41,代码来源:test_client.py
示例4: dict
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-will-unpwd-set",
keepalive=keepalive, username="oibvvwqw", password="#'^2hg9a&nm38*us",
will_topic="will-topic", will_qos=2, will_payload="will message")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../src:'+pp
开发者ID:AlexisBRENON,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-will-unpwd-set.py
示例5:
#!/usr/bin/env python
# Test whether a client produces a correct connect with clean session not set.
# The client should connect to port 1888 with keepalive=60, clean session not
# set, and client id 01-no-clean-session.
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-no-clean-session", clean_session=False, keepalive=keepalive)
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:30,代码来源:01-no-clean-session.py
示例6: test_message_callback
def test_message_callback(self, fake_broker):
mqttc = client.Client("client-id")
userdata = {
'on_message': 0,
'callback1': 0,
'callback2': 0,
}
mqttc.user_data_set(userdata)
def on_message(client, userdata, msg):
assert msg.topic == 'topic/value'
userdata['on_message'] += 1
def callback1(client, userdata, msg):
assert msg.topic == 'topic/callback/1'
userdata['callback1'] += 1
def callback2(client, userdata, msg):
assert msg.topic in ('topic/callback/3', 'topic/callback/1')
userdata['callback2'] += 1
mqttc.on_message = on_message
mqttc.message_callback_add('topic/callback/1', callback1)
mqttc.message_callback_add('topic/callback/+', callback2)
mqttc.connect_async("localhost", 1888)
mqttc.loop_start()
try:
fake_broker.start()
connect_packet = paho_test.gen_connect("client-id")
packet_in = fake_broker.receive_packet(len(connect_packet))
assert packet_in # Check connection was not closed
assert packet_in == connect_packet
connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)
publish_packet = paho_test.gen_publish(b"topic/value", qos=1, mid=1)
count = fake_broker.send_packet(publish_packet)
assert count # Check connection was not closed
assert count == len(publish_packet)
publish_packet = paho_test.gen_publish(b"topic/callback/1", qos=1, mid=2)
count = fake_broker.send_packet(publish_packet)
assert count # Check connection was not closed
assert count == len(publish_packet)
publish_packet = paho_test.gen_publish(b"topic/callback/3", qos=1, mid=3)
count = fake_broker.send_packet(publish_packet)
assert count # Check connection was not closed
assert count == len(publish_packet)
puback_packet = paho_test.gen_puback(mid=1)
packet_in = fake_broker.receive_packet(len(puback_packet))
assert packet_in # Check connection was not closed
assert packet_in == puback_packet
puback_packet = paho_test.gen_puback(mid=2)
packet_in = fake_broker.receive_packet(len(puback_packet))
assert packet_in # Check connection was not closed
assert packet_in == puback_packet
puback_packet = paho_test.gen_puback(mid=3)
packet_in = fake_broker.receive_packet(len(puback_packet))
assert packet_in # Check connection was not closed
assert packet_in == puback_packet
mqttc.disconnect()
disconnect_packet = paho_test.gen_disconnect()
packet_in = fake_broker.receive_packet(len(disconnect_packet))
assert packet_in # Check connection was not closed
assert packet_in == disconnect_packet
finally:
mqttc.loop_stop()
packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
assert userdata['on_message'] == 1
assert userdata['callback1'] == 1
assert userdata['callback2'] == 2
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:88,代码来源:test_client.py
示例7: dict
import subprocess
import socket
import sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(
os.path.abspath(os.path.join(os.path.split(inspect.getfile(inspect.currentframe()))[0], ".."))
)
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-unpwd-set", keepalive=keepalive, username="uname", password=";'[08gn=#")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(("", 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env["PYTHONPATH"]
except KeyError:
pp = ""
env["PYTHONPATH"] = "../../src:" + pp
client = subprocess.Popen(client_args, env=env)
开发者ID:RickyVaughn2,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-unpwd-set.py
示例8:
#!/usr/bin/env python
# Test whether a client sends a correct retained PUBLISH to a topic with QoS 0.
import context
import paho_test
rc = 1
keepalive = 60
mid = 16
connect_packet = paho_test.gen_connect("retain-qos0-test", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
publish_packet = paho_test.gen_publish(
u"retain/qos0/test", qos=0, payload="retained message".encode('utf-8'), retain=True)
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if paho_test.expect_packet(conn, "publish", publish_packet):
rc = 0
conn.close()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:31,代码来源:04-retain-qos0.py
示例9: print
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
if sys.version < '2.7':
print("WARNING: SSL not supported on Python 2.6")
exit(0)
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("08-ssl-connect-no-auth", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = ssl.wrap_socket(sock, ca_certs="../ssl/all-ca.crt", keyfile="../ssl/server.key", certfile="../ssl/server.crt", server_side=True, ssl_version=ssl.PROTOCOL_TLSv1)
ssock.settimeout(10)
ssock.bind(('', 1888))
ssock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
except KeyError:
开发者ID:AlexisBRENON,项目名称:paho.mqtt.python,代码行数:31,代码来源:08-ssl-connect-no-auth.py
示例10:
#!/usr/bin/env python
# Test whether a client connects correctly with a zero length clientid.
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("", keepalive=keepalive, proto_name="MQTT", proto_ver=4)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if paho_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
conn.close()
finally:
client.terminate()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-zero-length-clientid.py
示例11:
#!/usr/bin/env python
# Test whether a client produces a correct connect with a will.
# Will QoS=1, will retain=1.
# The client should connect to port 1888 with keepalive=60, clean session set,
# client id 01-will-set will topic set to topic/on/unexpected/disconnect , will
# payload set to "will message", will qos set to 1 and will retain set.
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect(
"01-will-set", keepalive=keepalive, will_topic="topic/on/unexpected/disconnect",
will_qos=1, will_retain=True, will_payload="will message".encode('utf-8'))
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
rc = 0
conn.close()
finally:
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-will-set.py
示例12:
# Test whether a client produces a correct connect with a unicode username and password.
# The client should connect to port 1888 with keepalive=60, clean session set,
# client id 01-unpwd-unicode-set, username and password from corresponding variables
from __future__ import unicode_literals
import context
import paho_test
rc = 1
keepalive = 60
username = "\u00fas\u00e9rn\u00e1m\u00e9-h\u00e9ll\u00f3"
password = "h\u00e9ll\u00f3"
connect_packet = paho_test.gen_connect(
"01-unpwd-unicode-set", keepalive=keepalive, username=username, password=password)
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
rc = 0
conn.close()
finally:
client.terminate()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-unpwd-unicode-set.py
示例13:
# The client should connect to port 1888 with keepalive=60, clean session set,
# and client id publish-helper-qos0-test
# The test will send a CONNACK message to the client with rc=0. Upon receiving
# the CONNACK and verifying that rc=0, the client should send a PUBLISH message
# to topic "pub/qos0/test" with payload "message" and QoS=0. If rc!=0, the
# client should exit with an error.
# After sending the PUBLISH message, the client should send a
# DISCONNECT message.
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect(
"publish-helper-qos0-test", keepalive=keepalive,
)
connack_packet = paho_test.gen_connack(rc=0)
publish_packet = paho_test.gen_publish(
u"pub/qos0/test", qos=0, payload="message".encode('utf-8'),
)
disconnect_packet = paho_test.gen_disconnect()
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
开发者ID:Nzbuu,项目名称:paho.mqtt.python,代码行数:31,代码来源:03-publish-helper-qos0.py
示例14: dict
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 4
connect_packet = paho_test.gen_connect("01-keepalive-pingreq", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
pingreq_packet = paho_test.gen_pingreq()
pingresp_packet = paho_test.gen_pingresp()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
开发者ID:AlexisBRENON,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-keepalive-pingreq.py
示例15:
import inspect
import os
import subprocess
import socket
import sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("subscribe-qos2-test", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
mid = 1
subscribe_packet = paho_test.gen_subscribe(mid, "qos2/test", 2)
suback_packet = paho_test.gen_suback(mid, 2)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
开发者ID:IceRage,项目名称:paho.mqtt.python,代码行数:31,代码来源:02-subscribe-qos2.py
示例16: dict
import inspect
import os
import subprocess
import socket
import sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-con-discon-success", keepalive=keepalive, proto_name="MQIsdp", proto_ver=3)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
except KeyError:
开发者ID:IceRage,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-con-discon-success.py
示例17: dict
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("publish-qos0-test-np", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
publish_packet = paho_test.gen_publish("pub/qos0/no-payload/test", qos=0)
disconnect_packet = paho_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
开发者ID:AlexisBRENON,项目名称:paho.mqtt.python,代码行数:31,代码来源:03-publish-qos0-no-payload.py
示例18: dict
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-con-discon-success", keepalive=keepalive)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
except KeyError:
开发者ID:AlexisBRENON,项目名称:paho.mqtt.python,代码行数:31,代码来源:01-con-discon-success.py
注:本文中的paho_test.gen_connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论