本文整理汇总了Python中zmq.zmq_version_info函数的典型用法代码示例。如果您正苦于以下问题:Python zmq_version_info函数的具体用法?Python zmq_version_info怎么用?Python zmq_version_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了zmq_version_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_frame_more
def test_frame_more(self):
"""test Frame.more attribute"""
frame = zmq.Frame(b"hello")
self.assertFalse(frame.more)
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
sa.send_multipart([b'hi', b'there'])
frame = self.recv(sb, copy=False)
self.assertTrue(frame.more)
if zmq.zmq_version_info()[0] >= 3 and not PYPY:
self.assertTrue(frame.get(zmq.MORE))
frame = self.recv(sb, copy=False)
self.assertFalse(frame.more)
if zmq.zmq_version_info()[0] >= 3 and not PYPY:
self.assertFalse(frame.get(zmq.MORE))
开发者ID:HunterChen,项目名称:pyzmq,代码行数:14,代码来源:test_message.py
示例2: test_monitor
def test_monitor(self):
"""Test monitoring interface for sockets."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6666")
# try monitoring the REP socket
s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
# create listening socket for monitor
s_event = self.context.socket(zmq.PAIR)
self.sockets.append(s_event)
s_event.connect("inproc://monitor.rep")
s_event.linger = 0
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6666")
if zmq.zmq_version_info() < (3,3):
self.assertRaises(NotImplementedError, get_monitor_message, s_event)
return
m = get_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECT_DELAYED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
# test receive event for connected event
m = get_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
开发者ID:WongTai,项目名称:pyzmq,代码行数:25,代码来源:test_monitor.py
示例3: _zmq_has_curve
def _zmq_has_curve():
"""
Return whether the current ZMQ has support for auth and CurveZMQ security.
:rtype: bool
Version notes:
`zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
Requires libzmq (>= 4.0) to have been linked with libsodium.
`zmq.auth` module is new in version 14.1
`zmq.has()` is new in version 14.1, new in version libzmq-4.1.
"""
zmq_version = zmq.zmq_version_info()
pyzmq_version = zmq.pyzmq_version_info()
if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
return zmq.has('curve')
if pyzmq_version < (14, 1, 0):
return False
if zmq_version < (4, 0):
# security is new in libzmq 4.0
return False
try:
zmq.curve_keypair()
except zmq.error.ZMQError:
# security requires libzmq to be linked against libsodium
return False
return True
开发者ID:PaixuAabuizia,项目名称:bitmask_client,代码行数:32,代码来源:utils.py
示例4: enable_monitor
def enable_monitor(self, events=None):
# The standard approach of binding and then connecting does not
# work in this specific case. The event loop does not properly
# detect messages on the inproc transport which means that event
# messages get missed.
# pyzmq's 'get_monitor_socket' method can't be used because this
# performs the actions in the wrong order for use with an event
# loop.
# For more information on this issue see:
# http://lists.zeromq.org/pipermail/zeromq-dev/2015-July/029181.html
if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))
if self._monitor is None:
addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
events = events or zmq.EVENT_ALL
_, self._monitor = yield from create_zmq_connection(
lambda: _ZmqEventProtocol(self._loop, self._protocol),
zmq.PAIR, connect=addr, loop=self._loop)
# bind must come after connect
self._zmq_sock.monitor(addr, events)
yield from self._monitor.wait_ready
开发者ID:TadLeonard,项目名称:aiozmq,代码行数:28,代码来源:core.py
示例5: test_proxy_steerable
def test_proxy_steerable(self):
if zmq.zmq_version_info() < (4, 1):
raise SkipTest("Steerable Proxies only in libzmq >= 4.1")
dev = devices.ThreadProxySteerable(
zmq.PULL,
zmq.PUSH,
zmq.PUSH,
zmq.PAIR
)
iface = 'tcp://127.0.0.1'
port = dev.bind_in_to_random_port(iface)
port2 = dev.bind_out_to_random_port(iface)
port3 = dev.bind_mon_to_random_port(iface)
port4 = dev.bind_ctrl_to_random_port(iface)
dev.start()
time.sleep(0.25)
msg = b'hello'
push = self.context.socket(zmq.PUSH)
push.connect("%s:%i" % (iface, port))
pull = self.context.socket(zmq.PULL)
pull.connect("%s:%i" % (iface, port2))
mon = self.context.socket(zmq.PULL)
mon.connect("%s:%i" % (iface, port3))
ctrl = self.context.socket(zmq.PAIR)
ctrl.connect("%s:%i" % (iface, port4))
push.send(msg)
self.sockets.extend([push, pull, mon, ctrl])
self.assertEqual(msg, self.recv(pull))
self.assertEqual(msg, self.recv(mon))
ctrl.send(b'TERMINATE')
dev.join()
开发者ID:zeromq,项目名称:pyzmq,代码行数:31,代码来源:test_proxy_steerable.py
示例6: set_hwm
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
.. warning::
New values only take effect for subsequent socket
bind/connects.
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception as e:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
开发者ID:fleeting,项目名称:tide,代码行数:27,代码来源:socket.py
示例7: get_monitor_socket
def get_monitor_socket(self, events=None, addr=None):
"""Return a connected PAIR socket ready to receive the event notifications.
.. versionadded:: libzmq-4.0
.. versionadded:: 14.0
Parameters
----------
events : bitfield (int) [default: ZMQ_EVENTS_ALL]
The bitmask defining which events are wanted.
addr : string [default: None]
The optional endpoint for the monitoring sockets.
Returns
-------
socket : (PAIR)
The socket is already connected and ready to receive messages.
"""
# safe-guard, method only available on libzmq >= 4
if zmq.zmq_version_info() < (4,):
raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
if addr is None:
# create endpoint name from internal fd
addr = "inproc://monitor.s-%d" % self.FD
if events is None:
# use all events
events = zmq.EVENT_ALL
# attach monitoring socket
self.monitor(addr, events)
# create new PAIR socket and connect it
ret = self.context.socket(zmq.PAIR)
ret.connect(addr)
return ret
开发者ID:gregbanks,项目名称:pyzmq,代码行数:33,代码来源:socket.py
示例8: test_proxy
def test_proxy(self):
if zmq.zmq_version_info() < (3,2):
raise SkipTest("Proxies only in libzmq >= 3")
dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
binder = self.context.socket(zmq.REQ)
iface = 'tcp://127.0.0.1'
port = binder.bind_to_random_port(iface)
port2 = binder.bind_to_random_port(iface)
port3 = binder.bind_to_random_port(iface)
binder.close()
time.sleep(0.1)
dev.bind_in("%s:%i" % (iface, port))
dev.bind_out("%s:%i" % (iface, port2))
dev.bind_mon("%s:%i" % (iface, port3))
dev.start()
time.sleep(0.25)
msg = b'hello'
push = self.context.socket(zmq.PUSH)
push.connect("%s:%i" % (iface, port))
pull = self.context.socket(zmq.PULL)
pull.connect("%s:%i" % (iface, port2))
mon = self.context.socket(zmq.PULL)
mon.connect("%s:%i" % (iface, port3))
push.send(msg)
self.sockets.extend([push, pull, mon])
self.assertEqual(msg, self.recv(pull))
self.assertEqual(msg, self.recv(mon))
开发者ID:BenCJeffery,项目名称:pyzmq,代码行数:27,代码来源:test_device.py
示例9: setUp
def setUp(self):
if zmq.zmq_version_info() < (4, 0):
raise SkipTest("security is new in libzmq 4.0")
try:
zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("security requires libzmq to be linked against libsodium")
super(TestSecurity, self).setUp()
开发者ID:hatbro,项目名称:pyzmq,代码行数:8,代码来源:test_security.py
示例10: setUp
def setUp(self):
if zmq.zmq_version_info() < (4,0):
raise SkipTest("security is new in libzmq 4.0")
try:
zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("security requires libzmq to be built with CURVE support")
super(TestSecurity, self).setUp()
开发者ID:326029212,项目名称:pyzmq,代码行数:8,代码来源:test_security.py
示例11: test_ctx_opts
def test_ctx_opts(self):
if zmq.zmq_version_info() < (3,):
raise SkipTest("context options require libzmq 3")
ctx = self.Context()
ctx.set(zmq.MAX_SOCKETS, 2)
self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 2)
ctx.max_sockets = 100
self.assertEqual(ctx.max_sockets, 100)
self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 100)
开发者ID:felipecruz,项目名称:pyzmq,代码行数:9,代码来源:test_context.py
示例12: setUp
def setUp(self):
if zmq.zmq_version_info() < (4, 0):
raise SkipTest("security is new in libzmq 4.0")
super(TestThreadedAuthentication, self).setUp()
# silence auth module debug log output during test runs
logger = logging.getLogger()
self.original_log_level = logger.getEffectiveLevel()
logger.setLevel(logging.DEBUG)
self.auth = None
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:9,代码来源:test_auth.py
示例13: test_int_sockopts
def test_int_sockopts(self):
"test integer sockopts"
v = zmq.zmq_version_info()
if v < (3,0):
default_hwm = 0
else:
default_hwm = 1000
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEqual(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEqual(p.getsockopt(zmq.LINGER), -1)
self.assertEqual(p.hwm, default_hwm)
p.hwm = 11
self.assertEqual(p.hwm, 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
sopt = backref[opt]
if sopt.startswith((
'ROUTER', 'XPUB', 'TCP', 'FAIL',
'REQ_', 'CURVE_', 'PROBE_ROUTER',
'IPC_FILTER', 'GSSAPI', 'STREAM_',
'VMCI_BUFFER_SIZE', 'VMCI_BUFFER_MIN_SIZE',
'VMCI_BUFFER_MAX_SIZE', 'VMCI_CONNECT_TIMEOUT',
)):
# some sockopts are write-only
continue
try:
n = p.getsockopt(opt)
except zmq.ZMQError as e:
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
else:
if n > 2**31:
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
" It is probably the wrong type."%sopt)
if errors:
self.fail('\n'.join([''] + errors))
开发者ID:thehesiod,项目名称:pyzmq,代码行数:52,代码来源:test_socket.py
示例14: test_removed
def test_removed(self):
zmq_version = zmq.zmq_version_info()
for version, new_names in constant_names.removed_in.items():
should_have = zmq_version < version
for name in new_names:
try:
value = getattr(zmq, name)
except AttributeError:
if should_have:
self.fail("AttributeError: zmq.%s" % name)
else:
if not should_have:
self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
开发者ID:RojavaCrypto,项目名称:pyzmq,代码行数:13,代码来源:test_constants.py
示例15: test_int_sockopts
def test_int_sockopts(self):
"test non-uint64 sockopts"
v = zmq.zmq_version_info()
if not v >= (2, 1):
raise SkipTest("only on libzmq >= 2.1")
elif v < (3, 0):
hwm = zmq.HWM
default_hwm = 0
else:
hwm = zmq.SNDHWM
default_hwm = 1000
p, s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEquals(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEquals(p.getsockopt(zmq.LINGER), -1)
self.assertEquals(p.getsockopt(hwm), default_hwm)
p.setsockopt(hwm, 11)
self.assertEquals(p.getsockopt(hwm), 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt, zmq.EVENTS, 2 ** 7 - 1)
self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.core.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.core.constants.int_sockopts + zmq.core.constants.int64_sockopts:
sopt = backref[opt]
if sopt == "FAIL_UNROUTABLE":
# fail_unroutable is write-only
continue
try:
n = p.getsockopt(opt)
except zmq.ZMQError as e:
errors.append("getsockopt(zmq.%s) raised '%s'." % (sopt, e))
else:
if n > 2 ** 31:
errors.append(
"getsockopt(zmq.%s) returned a ridiculous value." " It is probably the wrong type." % sopt
)
if errors:
self.fail("\n".join(errors))
开发者ID:gotcha,项目名称:pyzmq,代码行数:51,代码来源:test_socket.py
示例16: _check_version
def _check_version(min_version_info, msg='Feature'):
"""Check for libzmq
raises ZMQVersionError if current zmq version is not at least min_version
min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info().
"""
global _zmq_version_info
if _zmq_version_info is None:
from zmq import zmq_version_info
_zmq_version_info = zmq_version_info()
if _zmq_version_info < min_version_info:
min_version = '.'.join(str(v) for v in min_version_info)
raise ZMQVersionError(min_version, msg)
开发者ID:326029212,项目名称:pyzmq,代码行数:14,代码来源:error.py
示例17: _select_recv
def _select_recv(self, multipart, socket, **kwargs):
"""call recv[_multipart] in a way that raises if there is nothing to receive"""
if zmq.zmq_version_info() >= (3,1,0):
# zmq 3.1 has a bug, where poll can return false positives,
# so we wait a little bit just in case
# See LIBZMQ-280 on JIRA
time.sleep(0.1)
r,w,x = zmq.select([socket], [], [], timeout=5)
assert len(r) > 0, "Should have received a message"
kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
recv = socket.recv_multipart if multipart else socket.recv
return recv(**kwargs)
开发者ID:AndreaCrotti,项目名称:pyzmq,代码行数:14,代码来源:__init__.py
示例18: __init__
def __init__(self, context=None, encoding='utf-8'):
if zmq.zmq_version_info() < (4,0):
raise NotImplementedError("Security is only available in libzmq >= 4.0")
self.context = context or zmq.Context.instance()
self.encoding = encoding
self.allow_any = False
self.zap_socket = None
self.whitelist = []
self.blacklist = []
# passwords is a dict keyed by domain and contains values
# of dicts with username:password pairs.
self.passwords = {}
# certs is dict keyed by domain and contains values
# of dicts keyed by the public keys from the specified location.
self.certs = {}
开发者ID:felipecruz,项目名称:pyzmq,代码行数:15,代码来源:auth.py
示例19: _setup_socket
def _setup_socket(self):
if self.socket is None:
self.socket = self.context.socket(self.socket_type)
if self.routing_id:
self.socket.identity = self.routing_id
if self.socket_type == zmq.ROUTER:
self.socket.ROUTER_MANDATORY = True
if zmq.zmq_version_info() >= (4, 1, 0):
self.socket.ROUTER_HANDOVER = True
elif self.socket_type == zmq.REQ:
self.socket.RCVTIMEO = int(self.timeout * 1000)
self.socket.SNDTIMEO = int(self.timeout * 1000)
self.auth_backend.configure()
self.heartbeat_backend.configure()
self.initialized = True
开发者ID:aldefalco,项目名称:pseud,代码行数:15,代码来源:common.py
示例20: test_proxy_bind_to_random_with_args
def test_proxy_bind_to_random_with_args(self):
if zmq.zmq_version_info() < (3, 2):
raise SkipTest("Proxies only in libzmq >= 3")
dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
iface = 'tcp://127.0.0.1'
ports = []
min, max = 5000, 5050
ports.extend([
dev.bind_in_to_random_port(iface, min_port=min, max_port=max),
dev.bind_out_to_random_port(iface, min_port=min, max_port=max),
dev.bind_mon_to_random_port(iface, min_port=min, max_port=max)
])
for port in ports:
if port < min or port > max:
self.fail('Unexpected port number: %i' % port)
开发者ID:zeromq,项目名称:pyzmq,代码行数:15,代码来源:test_device.py
注:本文中的zmq.zmq_version_info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论