本文整理汇总了Python中zmq.zmq_version函数的典型用法代码示例。如果您正苦于以下问题:Python zmq_version函数的具体用法?Python zmq_version怎么用?Python zmq_version使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了zmq_version函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_single_socket_forwarder_bind
def test_single_socket_forwarder_bind(self):
if zmq.zmq_version() in ('4.1.1', '4.0.6'):
raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
开发者ID:326029212,项目名称:pyzmq,代码行数:35,代码来源:test_device.py
示例2: test_unicode_sockopts
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEquals(s.send_unicode, s.send_unicode)
self.assertEquals(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
if zmq.zmq_version() < '4.0.0':
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
if zmq.zmq_version() >= '4.0.0':
# skip the rest on 4.0, because IDENTITY was removed
return
st = s.getsockopt(zmq.IDENTITY)
self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEquals(topic, s.recv_unicode())
self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
开发者ID:hoov,项目名称:pyzmq,代码行数:26,代码来源:test_socket.py
示例3: run
def run(self):
"""Run the test suite, with nose, or unittest if nose is unavailable"""
# crude check for inplace build:
try:
import zmq
except ImportError:
print_exc()
fatal(
"\n ".join(
[
"Could not import zmq!",
"You must build pyzmq with 'python setup.py build_ext --inplace' for 'python setup.py test' to work.",
"If you did build pyzmq in-place, then this is a real error.",
]
)
)
sys.exit(1)
info("Testing pyzmq-%s with libzmq-%s" % (zmq.pyzmq_version(), zmq.zmq_version()))
if nose is None:
warn("nose unavailable, falling back on unittest. Skipped tests will appear as ERRORs.")
return self.run_unittest()
else:
return self.run_nose()
开发者ID:underrun,项目名称:pyzmq,代码行数:25,代码来源:setup.py
示例4: _printheader
def _printheader(self, log):
"""Prints header to log file; inspired by that in GPAW."""
log(logo)
log('Amp: Atomistic Machine-learning Package')
log('Developed by Andrew Peterson, Alireza Khorshidi, and others,')
log('Brown University.')
log(' PI Website: http://brown.edu/go/catalyst')
log(' Official repository: http://bitbucket.org/andrewpeterson/amp')
log(' Official documentation: http://amp.readthedocs.org/')
log(' Citation:')
log(' Khorshidi & Peterson, Computer Physics Communications')
log(' doi:10.1016/j.cpc.2016.05.010 (2016)')
log('=' * 70)
log('User: %s' % getuser())
log('Hostname: %s' % gethostname())
log('Date: %s' % now(with_utc=True))
uname = platform.uname()
log('Architecture: %s' % uname[4])
log('PID: %s' % os.getpid())
log('Amp version: %s' % 'NOT NUMBERED YET.') # FIXME/ap. Look at GPAW
ampdirectory = os.path.dirname(os.path.abspath(__file__))
log('Amp directory: %s' % ampdirectory)
commithash, commitdate = get_git_commit(ampdirectory)
log(' Last commit: %s' % commithash)
log(' Last commit date: %s' % commitdate)
log('Python: v{0}.{1}.{2}: %s'.format(*sys.version_info[:3]) %
sys.executable)
log('ASE v%s: %s' % (aseversion, os.path.dirname(ase.__file__)))
log('NumPy v%s: %s' %
(np.version.version, os.path.dirname(np.__file__)))
# SciPy is not a strict dependency.
try:
import scipy
log('SciPy v%s: %s' %
(scipy.version.version, os.path.dirname(scipy.__file__)))
except ImportError:
log('SciPy: not available')
# ZMQ an pxssh are only necessary for parallel calculations.
try:
import zmq
log('ZMQ/PyZMQ v%s/v%s: %s' %
(zmq.zmq_version(), zmq.pyzmq_version(),
os.path.dirname(zmq.__file__)))
except ImportError:
log('ZMQ: not available')
try:
import pxssh
log('pxssh: %s' % os.path.dirname(pxssh.__file__))
except ImportError:
log('pxssh: Not available from pxssh.')
try:
from pexpect import pxssh
except ImportError:
log('pxssh: Not available from pexpect.')
else:
import pexpect
log('pxssh (via pexpect v%s): %s' %
(pexpect.__version__, pxssh.__file__))
log('=' * 70)
开发者ID:AkshayTharval,项目名称:Atomistic-Machine-Learning-Potentials,代码行数:60,代码来源:__init__.py
示例5: enable_monitor
def enable_monitor(self, events=None):
# The standard approach of binding and then connecting does not
# work in this specific case. The event loop does not properly
# detect messages on the inproc transport which means that event
# messages get missed.
# pyzmq's 'get_monitor_socket' method can't be used because this
# performs the actions in the wrong order for use with an event
# loop.
# For more information on this issue see:
# http://lists.zeromq.org/pipermail/zeromq-dev/2015-July/029181.html
if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))
if self._monitor is None:
addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
events = events or zmq.EVENT_ALL
_, self._monitor = yield from create_zmq_connection(
lambda: _ZmqEventProtocol(self._loop, self._protocol),
zmq.PAIR, connect=addr, loop=self._loop)
# bind must come after connect
self._zmq_sock.monitor(addr, events)
yield from self._monitor.wait_ready
开发者ID:TadLeonard,项目名称:aiozmq,代码行数:28,代码来源:core.py
示例6: test_router_router
def test_router_router(self):
"""test router-router MQ devices"""
if zmq.zmq_version() >= '4.0.0':
raise SkipTest("Only for libzmq < 4")
dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, asbytes('in'), asbytes('out'))
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
binder = self.context.socket(zmq.DEALER)
porta = binder.bind_to_random_port('tcp://127.0.0.1')
portb = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
a = self.context.socket(zmq.DEALER)
a.identity = asbytes('a')
b = self.context.socket(zmq.DEALER)
b.identity = asbytes('b')
a.connect('tcp://127.0.0.1:%i'%porta)
dev.bind_in('tcp://127.0.0.1:%i'%porta)
b.connect('tcp://127.0.0.1:%i'%portb)
dev.bind_out('tcp://127.0.0.1:%i'%portb)
dev.start()
time.sleep(0.2)
msg = [ asbytes(m) for m in ('hello', 'there')]
a.send_multipart([asbytes('b')]+msg)
bmsg = self.recv_multipart(b)
self.assertEquals(bmsg, [asbytes('a')]+msg)
b.send_multipart(bmsg)
amsg = self.recv_multipart(a)
self.assertEquals(amsg, [asbytes('b')]+msg)
开发者ID:actank,项目名称:zmon,代码行数:32,代码来源:test_monqueue.py
示例7: __init__
def __init__(self):
Tool.__init__(self)
# * Initialize ZMQ context and open subscriber, publisher sockets
self.logger.debug("ZMQ version: {}, PyZMQ version: {}".format(zmq.zmq_version(), zmq.pyzmq_version()))
# ** Context
self.zmqContext = zmq.Context()
# ** Subscriber
self.subSocket = self.zmqContext.socket(zmq.SUB)
self.subSocket.connect(self.sub_address)
time.sleep(0.005) # mandatory sleep for ZMQ backend
self.logger.debug("[sub] Connected to {}".format(self.sub_address))
# ** Subscriber topics for input messages
self.subSocket.setsockopt(zmq.SUBSCRIBE, self.sub_topic)
self.subSocket.setsockopt(zmq.LINGER, self.sub_socket_linger)
self.logger.debug("[sub]Subscribed to topic \"{}\"".format(self.sub_topic))
time.sleep(0.005) # mandatory sleep for ZMQ backend
# ** Publisher
self.pubSocket = self.zmqContext.socket(zmq.PUB)
self.pubSocket.bind(self.pub_address)
time.sleep(0.005) # mandatory sleep for ZMQ backend
self.logger.debug("[pub] Bound to {}".format(self.pub_address))
# * Initialize other members
self.valid = False
self.buttons = [0, 0] # primary, secondary
self.transform = hm.translation(hm.identity(), self.position_offset)
#self.position = self.position_offset
self.loop = True # TODO ensure this is properly shared across threads
# * Start sensing loop
self.senseThread = Thread(target=self.senseLoop)
self.senseThread.daemon = True # to prevent indefinite wait on recv()
self.senseThread.start()
time.sleep(0.005) # sleep to allow child thread to run
开发者ID:Pallavistar,项目名称:pyTANG,代码行数:35,代码来源:HapticPointer.py
示例8: test_prefix
def test_prefix(self):
if zmq.zmq_version() < '3.0.0':
raise SkipTest("Only applies to libzmq >= 3.0")
xrep, xreq = self.create_bound_pair(zmq.XREP, zmq.XREQ)
msg = [ asbytes(p) for p in 'hi there'.split() ]
xreq.send_multipart(msg)
recvd = xrep.recv_multipart()
self.assertTrue(isinstance(recvd, tuple))
self.assertEquals(len(recvd), 2)
prefix, real = recvd
self.assertTrue(isinstance(prefix, list))
self.assertEquals(len(prefix), 1)
self.assertEquals(real, msg)
xrep.send_multipart(real, prefix=prefix)
echo = xreq.recv_multipart()
self.assertTrue(isinstance(echo, list))
self.assertEquals(echo, real)
extra = [asbytes('pre')]
xrep.send_multipart(msg, prefix=prefix+extra)
recvd = xreq.recv_multipart()
self.assertTrue(isinstance(recvd, tuple))
self.assertEquals(len(recvd), 2)
prefix, real = recvd
self.assertTrue(isinstance(prefix, list))
self.assertEquals(len(prefix), 1)
self.assertEquals(prefix, extra)
self.assertEquals(real, msg)
开发者ID:thurday,项目名称:pyzmq,代码行数:27,代码来源:test_socket.py
示例9: test_labels
def test_labels(self):
"""test device support for SNDLABEL"""
if zmq.zmq_version() < '3.0.0':
raise SkipTest("Only for libzmq 3")
dev = devices.ThreadDevice(zmq.QUEUE, zmq.XREP, -1)
# select random port:
binder = self.context.socket(zmq.XREQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = asbytes('hello')
req.send(msg, zmq.SNDLABEL)
req.send(msg, zmq.SNDMORE)
req.send(msg)
self.assertEquals(msg, self.recv(req))
self.assertTrue(req.rcvlabel)
self.assertEquals(msg, self.recv(req))
self.assertTrue(req.rcvmore)
self.assertEquals(msg, self.recv(req))
del dev
req.close()
开发者ID:actank,项目名称:zmon,代码行数:27,代码来源:test_device.py
示例10: test_router_router
def test_router_router(self):
"""test router-router MQ devices"""
if zmq.zmq_version() >= "4.0.0":
raise SkipTest("Only for libzmq < 4")
dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, "in", "out")
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
binder = self.context.socket(zmq.DEALER)
porta = binder.bind_to_random_port("tcp://127.0.0.1")
portb = binder.bind_to_random_port("tcp://127.0.0.1")
binder.close()
time.sleep(0.1)
a = self.context.socket(zmq.DEALER)
a.identity = asbytes("a")
b = self.context.socket(zmq.DEALER)
b.identity = asbytes("b")
a.connect("tcp://127.0.0.1:%i" % porta)
dev.bind_in("tcp://127.0.0.1:%i" % porta)
b.connect("tcp://127.0.0.1:%i" % portb)
dev.bind_out("tcp://127.0.0.1:%i" % portb)
dev.start()
time.sleep(0.2)
msg = [asbytes(m) for m in ("hello", "there")]
a.send_multipart(["b"] + msg)
bmsg = self.recv_multipart(b)
self.assertEquals(bmsg, ["a"] + msg)
b.send_multipart(bmsg)
amsg = self.recv_multipart(a)
self.assertEquals(amsg, ["b"] + msg)
开发者ID:thurday,项目名称:pyzmq,代码行数:32,代码来源:test_monqueue.py
示例11: check_for_zmq
def check_for_zmq(minimum_version, module='IPython.zmq'):
min_vlist = [int(n) for n in minimum_version.split('.')]
try:
import zmq
except ImportError:
raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))
pyzmq_version = zmq.__version__
vlist = [int(n) for n in re.findall(r'\d+', pyzmq_version)]
if 'dev' not in pyzmq_version and vlist < min_vlist:
raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
module, minimum_version, pyzmq_version))
# fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
if not hasattr(zmq, 'DEALER'):
zmq.DEALER = zmq.XREQ
if not hasattr(zmq, 'ROUTER'):
zmq.ROUTER = zmq.XREP
if zmq.zmq_version() >= '4.0.0':
warnings.warn("""libzmq 4 detected.
It is unlikely that IPython's zmq code will work properly.
Please install libzmq stable, which is 2.1.x or 2.2.x""",
RuntimeWarning)
开发者ID:adgaudio,项目名称:ipython,代码行数:26,代码来源:__init__.py
示例12: zmqversion
def zmqversion():
'''
Return the zeromq version
'''
# Provides:
# zmqversion
import zmq
return {'zmqversion': zmq.zmq_version()}
开发者ID:Anbcorp,项目名称:salt,代码行数:8,代码来源:core.py
示例13: __init__
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
开发者ID:326029212,项目名称:pyzmq,代码行数:8,代码来源:error.py
示例14: print_info
def print_info():
log.info("Python version: %s, %s",
'.'.join((str(e) for e in sys.version_info)),
sys.executable)
log.info("zeromq version: %s", zmq.zmq_version())
log.info("pyzmq version: %s", zmq.pyzmq_version())
log.info("track version: %s", str(track_base.version_info))
开发者ID:onlyone0001,项目名称:track,代码行数:8,代码来源:track_server.py
示例15: log_versions
def log_versions(logger):
logger.info(
'Versions:\n'
' PyZMQ: %s\n'
' libzmq: %s\n'
' Tornado: %s\n'
' SockJS-Tornado: Not Available\n' # TODO find the verison
' SockJS-ZMQProxy: %s\n',
zmq.pyzmq_version(),
zmq.zmq_version(),
tornado_version,
version
)
开发者ID:njoyce,项目名称:sockjs-zmqproxy,代码行数:13,代码来源:zmqproxy.py
示例16: test_bad_send_recv
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != "2.1.8":
# this doesn't work on 2.1.8
for copy in (True, False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b"asdf", copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b"asdf"
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
开发者ID:hitej,项目名称:meta-core,代码行数:13,代码来源:test_reqrep.py
示例17: test_router_dealer
def test_router_dealer(self):
if zmq.zmq_version() >= '4.0.0':
raise SkipTest("ROUTER/DEALER change in 4.0")
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = asbytes('message1')
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEquals(more, True)
msg2 = self.recv(router)
self.assertEquals(msg1, msg2)
more = router.rcvmore
self.assertEquals(more, False)
开发者ID:actank,项目名称:zmon,代码行数:14,代码来源:test_multipart.py
示例18: test_router_dealer
def test_router_dealer(self):
if zmq.zmq_version() >= '3.0.0':
raise SkipTest("Known bug in libzmq 3.0.0, see https://zeromq.jira.com/browse/LIBZMQ-232")
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = asbytes('message1')
dealer.send(msg1)
ident = router.recv()
more = router.rcvmore()
self.assertEquals(more, True)
msg2 = router.recv()
self.assertEquals(msg1, msg2)
more = router.rcvmore()
self.assertEquals(more, False)
开发者ID:hoov,项目名称:pyzmq,代码行数:14,代码来源:test_multipart.py
示例19: test_int_sockopts
def test_int_sockopts(self):
"test non-uint64 sockopts"
v = zmq.zmq_version()
if not v >= '2.1':
raise SkipTest("only on libzmq >= 2.1")
elif v < '3.0':
hwm = zmq.HWM
default_hwm = 0
else:
hwm = zmq.SNDHWM
default_hwm = 1000
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEquals(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEquals(p.getsockopt(zmq.LINGER), -1)
self.assertEquals(p.getsockopt(hwm), default_hwm)
p.setsockopt(hwm, 11)
self.assertEquals(p.getsockopt(hwm), 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.core.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.core.constants.int_sockopts+zmq.core.constants.int64_sockopts:
sopt = backref[opt]
try:
n = p.getsockopt(opt)
except zmq.ZMQError:
e = sys.exc_info()[1]
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
else:
if n > 2**31:
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
" It is probably the wrong type."%sopt)
if errors:
self.fail('\n'.join(errors))
开发者ID:actank,项目名称:zmon,代码行数:48,代码来源:test_socket.py
示例20: test_closing_full
def test_closing_full():
ctx = zmq.Context()
ver = zmq.zmq_version()[:3]
global port
out = ctx.socket( zmq.XREQ )
out.bind( "tcp://*:%d" % port )
inc = ctx.socket( zmq.XREP )
inc.connect( "tcp://localhost:%d" % port )
port += 1
try:
# Test that ..._multipart and zmq.RCVMORE behave as we think
out.send_multipart( ['', 'TEST1'] )
msg = inc.recv_multipart()
if ver in ("2.1", "3.1"):
assert len( msg ) == 3
elif ver in ("3.0"):
assert isinstance( msg, tuple )
assert len( msg[0] ) == 1
assert len( msg[1] ) == 2
else:
assert False and "Unknown zmq version"
out.send_multipart( ['', 'TEST2'] )
msg = []
poller = zmq.Poller()
poller.register(inc, zmq.POLLIN)
while poller.poll( timeout=100 ):
msg.append( inc.recv() )
if not inc.getsockopt( zmq.RCVMORE ):
break
assert len( msg ) == 3
## Add another stage; see if we can intercept and re-route between
## inc/nxt
#nxt = ctx.socket( zmq.XREQ )
#nxt.connect( "tcp://localhost:%d" % port )
#
#lst = ctx.socket( zme.XREP )
#nxt.bind( "tcp://*:%d" % port )
#port += 1
finally:
out.close()
inc.close()
ctx.term()
开发者ID:dyim42,项目名称:pyzmq-demos,代码行数:48,代码来源:assigner_test.py
注:本文中的zmq.zmq_version函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论