• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python zmq.zmq_version_info函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zmq.zmq_version_info函数的典型用法代码示例。如果您正苦于以下问题:Python zmq_version_info函数的具体用法?Python zmq_version_info怎么用?Python zmq_version_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了zmq_version_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_frame_more

 def test_frame_more(self):
     """test Frame.more attribute"""
     frame = zmq.Frame(b"hello")
     self.assertFalse(frame.more)
     sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
     sa.send_multipart([b'hi', b'there'])
     frame = self.recv(sb, copy=False)
     self.assertTrue(frame.more)
     if zmq.zmq_version_info()[0] >= 3 and not PYPY:
         self.assertTrue(frame.get(zmq.MORE))
     frame = self.recv(sb, copy=False)
     self.assertFalse(frame.more)
     if zmq.zmq_version_info()[0] >= 3 and not PYPY:
         self.assertFalse(frame.get(zmq.MORE))
开发者ID:HunterChen,项目名称:pyzmq,代码行数:14,代码来源:test_message.py


示例2: test_monitor

 def test_monitor(self):
     """Test monitoring interface for sockets."""
     s_rep = self.context.socket(zmq.REP)
     s_req = self.context.socket(zmq.REQ)
     self.sockets.extend([s_rep, s_req])
     s_req.bind("tcp://127.0.0.1:6666")
     # try monitoring the REP socket
     
     s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
     # create listening socket for monitor
     s_event = self.context.socket(zmq.PAIR)
     self.sockets.append(s_event)
     s_event.connect("inproc://monitor.rep")
     s_event.linger = 0
     # test receive event for connect event
     s_rep.connect("tcp://127.0.0.1:6666")
     if zmq.zmq_version_info() < (3,3):
         self.assertRaises(NotImplementedError, get_monitor_message, s_event)
         return
     m = get_monitor_message(s_event)
     self.assertEqual(m['event'], zmq.EVENT_CONNECT_DELAYED)
     self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
     # test receive event for connected event
     m = get_monitor_message(s_event)
     self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
开发者ID:WongTai,项目名称:pyzmq,代码行数:25,代码来源:test_monitor.py


示例3: _zmq_has_curve

def _zmq_has_curve():
    """
    Return whether the current ZMQ has support for auth and CurveZMQ security.

    :rtype: bool

     Version notes:
       `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
            Requires libzmq (>= 4.0) to have been linked with libsodium.
       `zmq.auth` module is new in version 14.1
       `zmq.has()` is new in version 14.1, new in version libzmq-4.1.
    """
    zmq_version = zmq.zmq_version_info()
    pyzmq_version = zmq.pyzmq_version_info()

    if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
        return zmq.has('curve')

    if pyzmq_version < (14, 1, 0):
        return False

    if zmq_version < (4, 0):
        # security is new in libzmq 4.0
        return False

    try:
        zmq.curve_keypair()
    except zmq.error.ZMQError:
        # security requires libzmq to be linked against libsodium
        return False

    return True
开发者ID:PaixuAabuizia,项目名称:bitmask_client,代码行数:32,代码来源:utils.py


示例4: enable_monitor

    def enable_monitor(self, events=None):

        # The standard approach of binding and then connecting does not
        # work in this specific case. The event loop does not properly
        # detect messages on the inproc transport which means that event
        # messages get missed.
        # pyzmq's 'get_monitor_socket' method can't be used because this
        # performs the actions in the wrong order for use with an event
        # loop.
        # For more information on this issue see:
        # http://lists.zeromq.org/pipermail/zeromq-dev/2015-July/029181.html

        if (zmq.zmq_version_info() < (4,) or
                zmq.pyzmq_version_info() < (14, 4,)):
            raise NotImplementedError(
                "Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
                "have libzmq:{}, pyzmq:{}".format(
                    zmq.zmq_version(), zmq.pyzmq_version()))

        if self._monitor is None:
            addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
            events = events or zmq.EVENT_ALL
            _, self._monitor = yield from create_zmq_connection(
                lambda: _ZmqEventProtocol(self._loop, self._protocol),
                zmq.PAIR, connect=addr, loop=self._loop)
            # bind must come after connect
            self._zmq_sock.monitor(addr, events)
            yield from self._monitor.wait_ready
开发者ID:TadLeonard,项目名称:aiozmq,代码行数:28,代码来源:core.py


示例5: test_proxy_steerable

 def test_proxy_steerable(self):
     if zmq.zmq_version_info() < (4, 1):
         raise SkipTest("Steerable Proxies only in libzmq >= 4.1")
     dev = devices.ThreadProxySteerable(
         zmq.PULL,
         zmq.PUSH,
         zmq.PUSH,
         zmq.PAIR
     )
     iface = 'tcp://127.0.0.1'
     port = dev.bind_in_to_random_port(iface)
     port2 = dev.bind_out_to_random_port(iface)
     port3 = dev.bind_mon_to_random_port(iface)
     port4 = dev.bind_ctrl_to_random_port(iface)
     dev.start()
     time.sleep(0.25)
     msg = b'hello'
     push = self.context.socket(zmq.PUSH)
     push.connect("%s:%i" % (iface, port))
     pull = self.context.socket(zmq.PULL)
     pull.connect("%s:%i" % (iface, port2))
     mon = self.context.socket(zmq.PULL)
     mon.connect("%s:%i" % (iface, port3))
     ctrl = self.context.socket(zmq.PAIR)
     ctrl.connect("%s:%i" % (iface, port4))
     push.send(msg)
     self.sockets.extend([push, pull, mon, ctrl])
     self.assertEqual(msg, self.recv(pull))
     self.assertEqual(msg, self.recv(mon))
     ctrl.send(b'TERMINATE')
     dev.join()
开发者ID:zeromq,项目名称:pyzmq,代码行数:31,代码来源:test_proxy_steerable.py


示例6: set_hwm

    def set_hwm(self, value):
        """set the High Water Mark
        
        On libzmq ≥ 3, this sets both SNDHWM and RCVHWM


        .. warning::

            New values only take effect for subsequent socket
            bind/connects.
        """
        major = zmq.zmq_version_info()[0]
        if major >= 3:
            raised = None
            try:
                self.sndhwm = value
            except Exception as e:
                raised = e
            try:
                self.rcvhwm = value
            except Exception as e:
                raised = e
            
            if raised:
                raise raised
        else:
            return self.setsockopt(zmq.HWM, value)
开发者ID:fleeting,项目名称:tide,代码行数:27,代码来源:socket.py


示例7: get_monitor_socket

    def get_monitor_socket(self, events=None, addr=None):
        """Return a connected PAIR socket ready to receive the event notifications.
        
        .. versionadded:: libzmq-4.0
        .. versionadded:: 14.0
        
        Parameters
        ----------
        events : bitfield (int) [default: ZMQ_EVENTS_ALL]
            The bitmask defining which events are wanted.
        addr :  string [default: None]
            The optional endpoint for the monitoring sockets.

        Returns
        -------
        socket :  (PAIR)
            The socket is already connected and ready to receive messages.
        """
        # safe-guard, method only available on libzmq >= 4
        if zmq.zmq_version_info() < (4,):
            raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
        if addr is None:
            # create endpoint name from internal fd
            addr = "inproc://monitor.s-%d" % self.FD
        if events is None:
            # use all events
            events = zmq.EVENT_ALL
        # attach monitoring socket
        self.monitor(addr, events)
        # create new PAIR socket and connect it
        ret = self.context.socket(zmq.PAIR)
        ret.connect(addr)
        return ret
开发者ID:gregbanks,项目名称:pyzmq,代码行数:33,代码来源:socket.py


示例8: test_proxy

 def test_proxy(self):
     if zmq.zmq_version_info() < (3,2):
         raise SkipTest("Proxies only in libzmq >= 3")
     dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
     binder = self.context.socket(zmq.REQ)
     iface = 'tcp://127.0.0.1'
     port = binder.bind_to_random_port(iface)
     port2 = binder.bind_to_random_port(iface)
     port3 = binder.bind_to_random_port(iface)
     binder.close()
     time.sleep(0.1)
     dev.bind_in("%s:%i" % (iface, port))
     dev.bind_out("%s:%i" % (iface, port2))
     dev.bind_mon("%s:%i" % (iface, port3))
     dev.start()
     time.sleep(0.25)
     msg = b'hello'
     push = self.context.socket(zmq.PUSH)
     push.connect("%s:%i" % (iface, port))
     pull = self.context.socket(zmq.PULL)
     pull.connect("%s:%i" % (iface, port2))
     mon = self.context.socket(zmq.PULL)
     mon.connect("%s:%i" % (iface, port3))
     push.send(msg)
     self.sockets.extend([push, pull, mon])
     self.assertEqual(msg, self.recv(pull))
     self.assertEqual(msg, self.recv(mon))
开发者ID:BenCJeffery,项目名称:pyzmq,代码行数:27,代码来源:test_device.py


示例9: setUp

 def setUp(self):
     if zmq.zmq_version_info() < (4, 0):
         raise SkipTest("security is new in libzmq 4.0")
     try:
         zmq.curve_keypair()
     except zmq.ZMQError:
         raise SkipTest("security requires libzmq to be linked against libsodium")
     super(TestSecurity, self).setUp()
开发者ID:hatbro,项目名称:pyzmq,代码行数:8,代码来源:test_security.py


示例10: setUp

 def setUp(self):
     if zmq.zmq_version_info() < (4,0):
         raise SkipTest("security is new in libzmq 4.0")
     try:
         zmq.curve_keypair()
     except zmq.ZMQError:
         raise SkipTest("security requires libzmq to be built with CURVE support")
     super(TestSecurity, self).setUp()
开发者ID:326029212,项目名称:pyzmq,代码行数:8,代码来源:test_security.py


示例11: test_ctx_opts

 def test_ctx_opts(self):
     if zmq.zmq_version_info() < (3,):
         raise SkipTest("context options require libzmq 3")
     ctx = self.Context()
     ctx.set(zmq.MAX_SOCKETS, 2)
     self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 2)
     ctx.max_sockets = 100
     self.assertEqual(ctx.max_sockets, 100)
     self.assertEqual(ctx.get(zmq.MAX_SOCKETS), 100)
开发者ID:felipecruz,项目名称:pyzmq,代码行数:9,代码来源:test_context.py


示例12: setUp

 def setUp(self):
     if zmq.zmq_version_info() < (4, 0):
         raise SkipTest("security is new in libzmq 4.0")
     super(TestThreadedAuthentication, self).setUp()
     # silence auth module debug log output during test runs
     logger = logging.getLogger()
     self.original_log_level = logger.getEffectiveLevel()
     logger.setLevel(logging.DEBUG)
     self.auth = None
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:9,代码来源:test_auth.py


示例13: test_int_sockopts

 def test_int_sockopts(self):
     "test integer sockopts"
     v = zmq.zmq_version_info()
     if v < (3,0):
         default_hwm = 0
     else:
         default_hwm = 1000
     p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
     p.setsockopt(zmq.LINGER, 0)
     self.assertEqual(p.getsockopt(zmq.LINGER), 0)
     p.setsockopt(zmq.LINGER, -1)
     self.assertEqual(p.getsockopt(zmq.LINGER), -1)
     self.assertEqual(p.hwm, default_hwm)
     p.hwm = 11
     self.assertEqual(p.hwm, 11)
     # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
     self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
     self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
     self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
     self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
     self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
     self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
     
     # check for overflow / wrong type:
     errors = []
     backref = {}
     constants = zmq.constants
     for name in constants.__all__:
         value = getattr(constants, name)
         if isinstance(value, int):
             backref[value] = name
     for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
         sopt = backref[opt]
         if sopt.startswith((
             'ROUTER', 'XPUB', 'TCP', 'FAIL',
             'REQ_', 'CURVE_', 'PROBE_ROUTER',
             'IPC_FILTER', 'GSSAPI', 'STREAM_',
             'VMCI_BUFFER_SIZE', 'VMCI_BUFFER_MIN_SIZE',
             'VMCI_BUFFER_MAX_SIZE', 'VMCI_CONNECT_TIMEOUT',
             )):
             # some sockopts are write-only
             continue
         try:
             n = p.getsockopt(opt)
         except zmq.ZMQError as e:
             errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
         else:
             if n > 2**31:
                 errors.append("getsockopt(zmq.%s) returned a ridiculous value."
                                 " It is probably the wrong type."%sopt)
     if errors:
         self.fail('\n'.join([''] + errors))
开发者ID:thehesiod,项目名称:pyzmq,代码行数:52,代码来源:test_socket.py


示例14: test_removed

 def test_removed(self):
     zmq_version = zmq.zmq_version_info()
     for version, new_names in constant_names.removed_in.items():
         should_have = zmq_version < version
         for name in new_names:
             try:
                 value = getattr(zmq, name)
             except AttributeError:
                 if should_have:
                     self.fail("AttributeError: zmq.%s" % name)
             else:
                 if not should_have:
                     self.fail("Shouldn't have: zmq.%s=%s" % (name, value))
开发者ID:RojavaCrypto,项目名称:pyzmq,代码行数:13,代码来源:test_constants.py


示例15: test_int_sockopts

    def test_int_sockopts(self):
        "test non-uint64 sockopts"
        v = zmq.zmq_version_info()
        if not v >= (2, 1):
            raise SkipTest("only on libzmq >= 2.1")
        elif v < (3, 0):
            hwm = zmq.HWM
            default_hwm = 0
        else:
            hwm = zmq.SNDHWM
            default_hwm = 1000
        p, s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        p.setsockopt(zmq.LINGER, 0)
        self.assertEquals(p.getsockopt(zmq.LINGER), 0)
        p.setsockopt(zmq.LINGER, -1)
        self.assertEquals(p.getsockopt(zmq.LINGER), -1)
        self.assertEquals(p.getsockopt(hwm), default_hwm)
        p.setsockopt(hwm, 11)
        self.assertEquals(p.getsockopt(hwm), 11)
        # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
        self.assertEquals(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
        self.assertRaisesErrno(zmq.EINVAL, p.setsockopt, zmq.EVENTS, 2 ** 7 - 1)
        self.assertEquals(p.getsockopt(zmq.TYPE), p.socket_type)
        self.assertEquals(p.getsockopt(zmq.TYPE), zmq.PUB)
        self.assertEquals(s.getsockopt(zmq.TYPE), s.socket_type)
        self.assertEquals(s.getsockopt(zmq.TYPE), zmq.SUB)

        # check for overflow / wrong type:
        errors = []
        backref = {}
        constants = zmq.core.constants
        for name in constants.__all__:
            value = getattr(constants, name)
            if isinstance(value, int):
                backref[value] = name
        for opt in zmq.core.constants.int_sockopts + zmq.core.constants.int64_sockopts:
            sopt = backref[opt]
            if sopt == "FAIL_UNROUTABLE":
                # fail_unroutable is write-only
                continue
            try:
                n = p.getsockopt(opt)
            except zmq.ZMQError as e:
                errors.append("getsockopt(zmq.%s) raised '%s'." % (sopt, e))
            else:
                if n > 2 ** 31:
                    errors.append(
                        "getsockopt(zmq.%s) returned a ridiculous value." " It is probably the wrong type." % sopt
                    )
        if errors:
            self.fail("\n".join(errors))
开发者ID:gotcha,项目名称:pyzmq,代码行数:51,代码来源:test_socket.py


示例16: _check_version

def _check_version(min_version_info, msg='Feature'):
    """Check for libzmq
    
    raises ZMQVersionError if current zmq version is not at least min_version
    
    min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info().
    """
    global _zmq_version_info
    if _zmq_version_info is None:
        from zmq import zmq_version_info
        _zmq_version_info = zmq_version_info()
    if _zmq_version_info < min_version_info:
        min_version = '.'.join(str(v) for v in min_version_info)
        raise ZMQVersionError(min_version, msg)
开发者ID:326029212,项目名称:pyzmq,代码行数:14,代码来源:error.py


示例17: _select_recv

 def _select_recv(self, multipart, socket, **kwargs):
     """call recv[_multipart] in a way that raises if there is nothing to receive"""
     if zmq.zmq_version_info() >= (3,1,0):
         # zmq 3.1 has a bug, where poll can return false positives,
         # so we wait a little bit just in case
         # See LIBZMQ-280 on JIRA
         time.sleep(0.1)
     
     r,w,x = zmq.select([socket], [], [], timeout=5)
     assert len(r) > 0, "Should have received a message"
     kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
     
     recv = socket.recv_multipart if multipart else socket.recv
     return recv(**kwargs)
开发者ID:AndreaCrotti,项目名称:pyzmq,代码行数:14,代码来源:__init__.py


示例18: __init__

 def __init__(self, context=None, encoding='utf-8'):
     if zmq.zmq_version_info() < (4,0):
         raise NotImplementedError("Security is only available in libzmq >= 4.0")
     self.context = context or zmq.Context.instance()
     self.encoding = encoding
     self.allow_any = False
     self.zap_socket = None
     self.whitelist = []
     self.blacklist = []
     # passwords is a dict keyed by domain and contains values
     # of dicts with username:password pairs.
     self.passwords = {}
     # certs is dict keyed by domain and contains values
     # of dicts keyed by the public keys from the specified location.
     self.certs = {}
开发者ID:felipecruz,项目名称:pyzmq,代码行数:15,代码来源:auth.py


示例19: _setup_socket

 def _setup_socket(self):
     if self.socket is None:
         self.socket = self.context.socket(self.socket_type)
     if self.routing_id:
         self.socket.identity = self.routing_id
     if self.socket_type == zmq.ROUTER:
         self.socket.ROUTER_MANDATORY = True
         if zmq.zmq_version_info() >= (4, 1, 0):
             self.socket.ROUTER_HANDOVER = True
     elif self.socket_type == zmq.REQ:
         self.socket.RCVTIMEO = int(self.timeout * 1000)
     self.socket.SNDTIMEO = int(self.timeout * 1000)
     self.auth_backend.configure()
     self.heartbeat_backend.configure()
     self.initialized = True
开发者ID:aldefalco,项目名称:pseud,代码行数:15,代码来源:common.py


示例20: test_proxy_bind_to_random_with_args

 def test_proxy_bind_to_random_with_args(self):
     if zmq.zmq_version_info() < (3, 2):
         raise SkipTest("Proxies only in libzmq >= 3")
     dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
     iface = 'tcp://127.0.0.1'
     ports = []
     min, max = 5000, 5050
     ports.extend([
         dev.bind_in_to_random_port(iface, min_port=min, max_port=max),
         dev.bind_out_to_random_port(iface, min_port=min, max_port=max),
         dev.bind_mon_to_random_port(iface, min_port=min, max_port=max)
     ])
     for port in ports:
         if port < min or port > max:
             self.fail('Unexpected port number: %i' % port)
开发者ID:zeromq,项目名称:pyzmq,代码行数:15,代码来源:test_device.py



注:本文中的zmq.zmq_version_info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python zmq.Context类代码示例发布时间:2022-05-26
下一篇:
Python zmq.zmq_version函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap