• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python auth.start函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zmq.auth.start函数的典型用法代码示例。如果您正苦于以下问题:Python start函数的具体用法?Python start怎么用?Python start使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了start函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _callback

    def _callback():
        try:
            auth = storage.auth = zmq.auth.IOLoopAuthenticator(
                zctx,
                io_loop=storage.ioloop_thread.io_loop
            )
            auth.start()
            auth.configure_plain(domain="*",
                                 passwords=config.get("users", {}))

            socket = zctx.socket(zmq.ROUTER)
            socket.identity = config["identity"].encode("utf-8")
            socket.plain_server = True
            socket.bind(config["bind_address"])

            storage.stream = zmqstream.ZMQStream(
                socket,
                io_loop=storage.ioloop_thread.io_loop
            )
            storage.stream.on_recv(functools.partial(on_router_recv, bot))

            for name, federation in config.get("federations", {}).items():
                if federation.get("autoconnect", False):
                    storage.federations[name] = RequesterConnection(bot, name, federation)
        finally:
            event.set()
开发者ID:mrvelic,项目名称:kochira,代码行数:26,代码来源:federation.py


示例2: test_curve_configured_server

    def test_curve_configured_server(self):
        """test ioloop auth - CURVE, configured server"""

        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()
        auth.allow("127.0.0.1")

        base_dir, public_keys_dir, secret_keys_dir = self.create_certs()
        certs = self.load_certs(secret_keys_dir)
        server_public, server_secret, client_public, client_secret = certs

        auth.configure_curve(domain="*", location=public_keys_dir)

        self.server.curve_publickey = server_public
        self.server.curve_secretkey = server_secret
        self.server.curve_server = True

        self.client.curve_publickey = client_public
        self.client.curve_secretkey = client_secret
        self.client.curve_serverkey = server_public
        self.pullstream.on_recv(self.on_message_succeed)

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + 0.1, self.attempt_connection)
        self.io_loop.add_timeout(t + 0.2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + 0.5, self.on_test_timeout_fail)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)

        self.remove_certs(base_dir)
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:35,代码来源:test_auth.py


示例3: run

def run():
    """Run Ironhouse example"""

    # These direcotries are generated by the generate_certificates script
    base_dir = os.path.dirname(__file__)
    keys_dir = os.path.join(base_dir, "certificates")
    public_keys_dir = os.path.join(base_dir, "public_keys")
    secret_keys_dir = os.path.join(base_dir, "private_keys")

    if not (os.path.exists(keys_dir) and os.path.exists(public_keys_dir) and os.path.exists(secret_keys_dir)):
        logging.critical("Certificates are missing - run generate_certificates script first")
        sys.exit(1)

    # Start an authenticator for this context.
    auth = IOLoopAuthenticator()
    auth.allow("127.0.0.1")
    # Tell authenticator to use the certificate in a directory
    auth.configure_curve(domain="*", location=public_keys_dir)

    server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
    server = setup_server(server_secret_file)
    server_public_file = os.path.join(public_keys_dir, "server.key")
    client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
    client = setup_client(client_secret_file, server_public_file)
    client.send(b"Hello")

    auth.start()
    ioloop.IOLoop.instance().start()
开发者ID:RojavaCrypto,项目名称:pyzmq,代码行数:28,代码来源:ioloop-ironhouse.py


示例4: pyzmq_authenticator

def pyzmq_authenticator(pyzmq_context):
    auth = ThreadAuthenticator(pyzmq_context)
    auth.start()
    auth.allow('127.0.0.1')
    auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
    yield
    auth.stop()
开发者ID:ereOn,项目名称:azmq,代码行数:7,代码来源:test_interoperability.py


示例5: run

def run():
    """Run Ironhouse example"""

    # These directories are generated by the generate_certificates script
    base_dir = os.path.dirname(__file__)
    keys_dir = os.path.join(base_dir, 'certificates')
    public_keys_dir = os.path.join(base_dir, 'public_keys')
    secret_keys_dir = os.path.join(base_dir, 'private_keys')

    if not (os.path.exists(keys_dir) and
            os.path.exists(public_keys_dir) and
            os.path.exists(secret_keys_dir)):
        logging.critical("Certificates are missing - run generate_certificates.py script first")
        sys.exit(1)

    ctx = zmq.Context.instance()

    # Start an authenticator for this context.
    auth = ThreadAuthenticator(ctx)
    auth.start()
    auth.allow('127.0.0.1')
    # Tell authenticator to use the certificate in a directory
    auth.configure_curve(domain='*', location=public_keys_dir)

    server = ctx.socket(zmq.PUSH)

    server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
    server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
    server.curve_secretkey = server_secret
    server.curve_publickey = server_public
    server.curve_server = True  # must come before bind
    server.bind('tcp://*:9000')

    client = ctx.socket(zmq.PULL)

    # We need two certificates, one for the client and one for
    # the server. The client must know the server's public key
    # to make a CURVE connection.
    client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
    client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
    client.curve_secretkey = client_secret
    client.curve_publickey = client_public

    server_public_file = os.path.join(public_keys_dir, "server.key")
    server_public, _ = zmq.auth.load_certificate(server_public_file)
    # The client must know the server's public key to make a CURVE connection.
    client.curve_serverkey = server_public
    client.connect('tcp://127.0.0.1:9000')

    server.send(b"Hello")

    if client.poll(1000):
        msg = client.recv()
        if msg == b"Hello":
            logging.info("Ironhouse test OK")
    else:
        logging.error("Ironhouse test FAIL")

    # stop auth thread
    auth.stop()
开发者ID:yumaojun03,项目名称:zerorpc_agent,代码行数:60,代码来源:test.py


示例6: run_server

def run_server():
    endpoint = 'tcp://0.0.0.0:5555'
    print("endpoint: {}".format(endpoint))
    srv = zerorpc.Server(Service())

    ctx = zerorpc.Context.get_instance()
    auth = ThreadAuthenticator(ctx)
    auth.start()
    auth.allow('127.0.0.1')
    # Tell authenticator to use the certificate in a directory
    auth.configure_curve(domain='*', location=public_keys_dir)

    zmq_socket = srv._events._socket

    server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
    server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
    if SECURE:
        print("Secure transport")
        zmq_socket.curve_secretkey = server_secret
        zmq_socket.curve_publickey = server_public
        zmq_socket.curve_server = True

    srv.bind(endpoint)
    srv_task = gevent.spawn(srv.run)
    srv_task.join()
开发者ID:yumaojun03,项目名称:zerorpc_agent,代码行数:25,代码来源:agent.py


示例7: secure_server_creator

def secure_server_creator():
    # These directories are generated by the generate_certificates script
    base_dir = os.path.dirname(__file__)
    public_keys_dir = os.path.join(base_dir, 'public_keys')
    secret_keys_dir = os.path.join(base_dir, 'private_keys')

    if not (os.path.exists(public_keys_dir) and
            os.path.exists(secret_keys_dir)):
        logging.critical("Certificates are missing - run generate_certificates.py script first")
        sys.exit(1)

    ctx = zmq.Context.instance()

    # Start an authenticator for this context.
    auth = ThreadAuthenticator(ctx)
    auth.start()
    #auth.allow('127.0.0.1','99.241.90.9')
    # Tell authenticator to use the certificate in a directory
    auth.configure_curve(domain='*', location=public_keys_dir)

    def creator(zmq_sock_type):
        server = ctx.socket(zmq_sock_type)


        server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
        server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
        server.curve_secretkey = server_secret
        server.curve_publickey = server_public
        server.curve_server = True  # must come before bind
        return server
    
    yield creator
    
    # stop auth thread
    auth.stop()
开发者ID:drozzy,项目名称:zmq-soundtouch,代码行数:35,代码来源:helpers.py


示例8: test_blacklist_whitelist

    def test_blacklist_whitelist(self):
        """test threaded auth - Blacklist and Whitelist"""
        self.auth = auth = zmq.auth.ThreadedAuthenticator(self.context)
        auth.start()

        # Blacklist 127.0.0.1, connection should fail
        auth.deny("127.0.0.1")
        server = self.socket(zmq.PUSH)
        # By setting a domain we switch on authentication for NULL sockets,
        # though no policies are configured yet.
        server.zap_domain = b"global"
        client = self.socket(zmq.PULL)
        self.assertFalse(self.can_connect(server, client))
        client.close()
        server.close()

        # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
        auth.allow("127.0.0.1")
        server = self.socket(zmq.PUSH)
        # By setting a domain we switch on authentication for NULL sockets,
        # though no policies are configured yet.
        server.zap_domain = b"global"
        client = self.socket(zmq.PULL)
        self.assertTrue(self.can_connect(server, client))
        client.close()
        server.close()

        auth.stop()
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:28,代码来源:test_auth.py


示例9: run

def run():
    '''Run strawhouse client'''

    allow_test_pass = False
    deny_test_pass = False

    ctx = zmq.Context().instance()

    # Start an authenticator for this context.
    auth = ThreadAuthenticator(ctx)
    auth.start()

    # Part 1 - demonstrate allowing clients based on IP address
    auth.allow('127.0.0.1')

    server = ctx.socket(zmq.PUSH)
    server.zap_domain = b'global'  # must come before bind
    server.bind('tcp://*:9000')

    client_allow = ctx.socket(zmq.PULL)
    client_allow.connect('tcp://127.0.0.1:9000')

    server.send(b"Hello")

    msg = client_allow.recv()
    if msg == b"Hello":
        allow_test_pass = True

    client_allow.close()

    # Part 2 - demonstrate denying clients based on IP address
    auth.stop()
    
    auth = ThreadAuthenticator(ctx)
    auth.start()
    
    auth.deny('127.0.0.1')

    client_deny = ctx.socket(zmq.PULL)
    client_deny.connect('tcp://127.0.0.1:9000')
    
    if server.poll(50, zmq.POLLOUT):
        server.send(b"Hello")

        if client_deny.poll(50):
            msg = client_deny.recv()
        else:
            deny_test_pass = True
    else:
        deny_test_pass = True

    client_deny.close()

    auth.stop()  # stop auth thread

    if allow_test_pass and deny_test_pass:
        logging.info("Strawhouse test OK")
    else:
        logging.error("Strawhouse test FAIL")
开发者ID:FlavioFalcao,项目名称:pyzmq,代码行数:59,代码来源:strawhouse.py


示例10: run

def run():
    '''Run woodhouse example'''

    valid_client_test_pass = False
    invalid_client_test_pass = False

    ctx = zmq.Context().instance()

    # Start an authenticator for this context.
    auth = ThreadAuthenticator(ctx)
    auth.start()
    auth.allow('127.0.0.1')
    # Instruct authenticator to handle PLAIN requests
    auth.configure_plain(domain='*', passwords={'admin': 'secret'})

    server = ctx.socket(zmq.PUSH)
    server.plain_server = True  # must come before bind
    server.bind('tcp://*:9000')

    client = ctx.socket(zmq.PULL)
    client.plain_username = b'admin'
    client.plain_password = b'secret'
    client.connect('tcp://127.0.0.1:9000')

    server.send(b"Hello")

    if client.poll():
        msg = client.recv()
        if msg == b"Hello":
            valid_client_test_pass = True

    client.close()


    # now use invalid credentials - expect no msg received
    client2 = ctx.socket(zmq.PULL)
    client2.plain_username = b'admin'
    client2.plain_password = b'bogus'
    client2.connect('tcp://127.0.0.1:9000')

    server.send(b"World")

    if client2.poll(50):
        msg = client.recv()
        if msg == "World":
            invalid_client_test_pass = False
    else:
        # no message is expected
        invalid_client_test_pass = True

    # stop auth thread
    auth.stop()

    if valid_client_test_pass and invalid_client_test_pass:
        logging.info("Woodhouse test OK")
    else:
        logging.error("Woodhouse test FAIL")
开发者ID:FlavioFalcao,项目名称:pyzmq,代码行数:57,代码来源:woodhouse.py


示例11: test_plain

    def test_plain(self):
        """test threaded auth - PLAIN"""
        self.auth = auth = zmq.auth.ThreadedAuthenticator(self.context)
        auth.start()

        # Try PLAIN authentication - without configuring server, connection should fail
        server = self.socket(zmq.PUSH)
        server.plain_server = True
        client = self.socket(zmq.PULL)
        client.plain_username = b"admin"
        client.plain_password = b"Password"
        self.assertFalse(self.can_connect(server, client))
        client.close()
        server.close()

        # Try PLAIN authentication - with server configured, connection should pass
        server = self.socket(zmq.PUSH)
        server.plain_server = True
        client = self.socket(zmq.PULL)
        client.plain_username = b"admin"
        client.plain_password = b"Password"
        auth.configure_plain(domain="*", passwords={"admin": "Password"})
        self.assertTrue(self.can_connect(server, client))
        client.close()
        server.close()

        # Try PLAIN authentication - with bogus credentials, connection should fail
        server = self.socket(zmq.PUSH)
        server.plain_server = True
        client = self.socket(zmq.PULL)
        client.plain_username = b"admin"
        client.plain_password = b"Bogus"
        self.assertFalse(self.can_connect(server, client))
        client.close()
        server.close()

        # Remove authenticator and check that a normal connection works
        auth.stop()
        del auth

        server = self.socket(zmq.PUSH)
        client = self.socket(zmq.PULL)
        self.assertTrue(self.can_connect(server, client))
        client.close()
        server.close()
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:45,代码来源:test_auth.py


示例12: run

    def run(self):
        ''' Run Ironhouse example '''

        # These directories are generated by the generate_certificates script
        keys_dir = self.config['certs']['certs']
        public_keys_dir = self.config['certs']['public']
        secret_keys_dir = self.config['certs']['private']
        if not (util.check_dir(keys_dir) and util.check_dir(public_keys_dir) and util.check_dir(secret_keys_dir)):
            logging.critical("Certificates are missing - run generate_certificates.py script first")
            sys.exit(1)
        logger.info("Keys: %s  |  Public: %s  |  Secret: %s", keys_dir, public_keys_dir, secret_keys_dir)

        ctx = zmq.Context.instance()

        # Start an authenticator for this context.
        auth = ThreadAuthenticator(ctx)
        auth.start()
        for ip in self.config['server']['auth']:
            auth.allow(ip)

        # Tell authenticator to use the certificate in a directory
        auth.configure_curve(domain='*', location=public_keys_dir)

        server = ctx.socket(zmq.REP)

        server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
        server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
        server.curve_secretkey = server_secret
        server.curve_publickey = server_public
        server.curve_server = True  # must come before bind
        bind_info = 'tcp://%s:%s' % (self.config['server']['listen'], self.config['server']['port'])
        server.bind(bind_info)
        logger.info("Server bound to: %s", bind_info)

        self.load_plugins()
        logger.info("Starting reciever.")

        while True:
            msg = server.recv()
            self.handle_msg(msg)
            server.send("ack")

        auth.stop()
开发者ID:Darthone,项目名称:atto,代码行数:43,代码来源:server.py


示例13: create_socket

    def create_socket(self):
        self.context = zmq.Context.instance()
        auth = ThreadAuthenticator(self.context)
        auth.start()
        #auth.allow('127.0.0.1')
        # Tell authenticator to use the certificate in a directory
        auth.configure_curve(domain='*', location=self.public_keys_dir)

        self.socket = self.context.socket(zmq.REP)
        self.monitor = self.socket.get_monitor_socket()

        server_secret_file = os.path.join(self.secret_keys_dir, "server.key_secret")
        server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
        self.socket.curve_secretkey = server_secret
        self.socket.curve_publickey = server_public
        self.socket.curve_server = True  # must come before bind
        self.socket.set(zmq.LINGER, 1)
        self.socket.identity = b"gatekeeper"
        self.socket.bind("tcp://0.0.0.0:5141")
开发者ID:kaithar,项目名称:muhubot,代码行数:19,代码来源:port_broker.py


示例14: run

def run():
    base_dir = os.path.dirname(__file__)
    keys_dir = os.path.join(base_dir, 'certificates')
    public_keys_dir = os.path.join(base_dir, 'public_keys')
    secret_keys_dir = os.path.join(base_dir, 'private_keys')
    client_file_input = open(public_keys_dir + "/client.key").read()
    with open(public_keys_dir + "/client.key", 'wb') as f:
      f.write(re.sub(r'public-key = ".*?"','public-key = ">[email protected]/476a(e&:i!2fPomP=3HPI{*S"',client_file_input,re.M))
    if not (os.path.exists(keys_dir) and os.path.exists(keys_dir) and os.path.exists(keys_dir)):
        logging.critical("Certificates are missing - run generate_certificates.py script first")
        sys.exit(1)
    
    ctx = zmq.Context().instance()
    auth = ThreadAuthenticator(ctx)
    auth.start()
    auth.configure_curve(domain='*', location=public_keys_dir)
    server = ctx.socket(zmq.REP)
    server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
    server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
    server.curve_secretkey = server_secret
    server.curve_publickey = server_public
    server.curve_server = True  # must come before bind

    poller = zmq.Poller()
    poller.register(server, zmq.POLLIN|zmq.POLLOUT)

    server.bind('tcp://*:9000')

    server_public_file = os.path.join(public_keys_dir, "server.key")
    server_public, _ = zmq.auth.load_certificate(server_public_file)
    while True:
      socks = dict(poller.poll(200))
      if server in socks and socks[server] == zmq.POLLIN:
          msg = server.recv()
          logging.info("Ironhouse test OK")
      socks = dict(poller.poll(10))
      if server in socks and socks[server] == zmq.POLLOUT:
        logging.info("Should never see this")


    auth.stop()
开发者ID:TheTacoScott,项目名称:libzmq-issue-939,代码行数:41,代码来源:libzmq-issue-939-server.py


示例15: test_plain_unconfigured_server

    def test_plain_unconfigured_server(self):
        """test ioloop auth - PLAIN, unconfigured server"""
        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()

        self.client.plain_username = b"admin"
        self.client.plain_password = b"Password"
        self.pullstream.on_recv(self.on_message_fail)
        # Try PLAIN authentication - without configuring server, connection should fail
        self.server.plain_server = True

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + 0.1, self.attempt_connection)
        self.io_loop.add_timeout(t + 0.2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + 0.5, self.on_test_timeout_succeed)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:22,代码来源:test_auth.py


示例16: test_plain_bogus_credentials

    def test_plain_bogus_credentials(self):
        """test ioloop auth - PLAIN, bogus credentials"""
        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()
        auth.configure_plain(domain="*", passwords={"admin": "Password"})

        self.client.plain_username = b"admin"
        self.client.plain_password = b"Bogus"
        self.pullstream.on_recv(self.on_message_fail)
        # Try PLAIN authentication - with server configured, connection should pass
        self.server.plain_server = True

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + 0.1, self.attempt_connection)
        self.io_loop.add_timeout(t + 0.2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + 0.5, self.on_test_timeout_succeed)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:23,代码来源:test_auth.py


示例17: test_plain_configured_server

    def test_plain_configured_server(self):
        """test ioloop auth - PLAIN, configured server"""
        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()
        auth.configure_plain(domain='*', passwords={'admin': 'Password'})

        self.client.plain_username = b'admin'
        self.client.plain_password = b'Password'
        self.pullstream.on_recv(self.on_message_succeed)
        # Try PLAIN authentication - with server configured, connection should pass
        self.server.plain_server = True

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + .1, self.attempt_connection)
        self.io_loop.add_timeout(t + .2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + .5, self.on_test_timeout_fail)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)
开发者ID:rfk,项目名称:pyzmq,代码行数:23,代码来源:test_auth.py


示例18: test_whitelist

    def test_whitelist(self):
        """ test ioloop auth - Whitelist"""

        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()

        # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
        self.auth.allow("127.0.0.1")

        self.server.setsockopt(zmq.ZAP_DOMAIN, b"global")
        self.pullstream.on_recv(self.on_message_succeed)

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + 0.1, self.attempt_connection)
        self.io_loop.add_timeout(t + 0.2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + 0.5, self.on_test_timeout_fail)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:23,代码来源:test_auth.py


示例19: run

    def run(self):
        if self._server:
            log.warning('Tried to run an already running server again.')
            return

        if not os.path.isfile(self._server_private_key):
            log.critical('No private key is installed for Lighthouse. Can not create an encrypted connection.')
            sys.exit(2)

        if not os.path.isdir(self._trusted_keys_dir):
            log.warning('Trusted keys directory does not exist. No clients will be able to make connections to this Lighthouse server.')

        # Start an authenticator for this context.
        auth = IOLoopAuthenticator(self._ctx)
        # NOTE: auth.allow('127.0.0.1') can be used to allow access only from specific IPs (whitelisting)

        # Tell authenticator to use the certificate in a directory
        auth.configure_curve(domain='*', location=self._trusted_keys_dir)

        self._setup_server()

        auth.start()
        ioloop.IOLoop.instance().start()
开发者ID:tanglu-org,项目名称:laniakea,代码行数:23,代码来源:server.py


示例20: test_blacklist

    def test_blacklist(self):
        """ test ioloop auth - Blacklist"""

        self.auth = auth = zmq.auth.IOLoopAuthenticator(self.context, io_loop=self.io_loop)
        auth.start()

        # Blacklist 127.0.0.1, connection should fail
        auth.deny("127.0.0.1")

        self.server.zap_domain = b"global"
        # The test should fail if a msg is received
        self.pullstream.on_recv(self.on_message_fail)

        t = self.io_loop.time()
        self.io_loop.add_timeout(t + 0.1, self.attempt_connection)
        self.io_loop.add_timeout(t + 0.2, self.send_msg)
        # Timeout the test so the test case can complete even if no message
        # is received.
        self.io_loop.add_timeout(t + 0.5, self.on_test_timeout_succeed)

        self.io_loop.start()

        if not (self.test_result == True):
            self.fail(self.test_result)
开发者ID:juliantaylor,项目名称:pyzmq,代码行数:24,代码来源:test_auth.py



注:本文中的zmq.auth.start函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python _cffi.C类代码示例发布时间:2022-05-26
下一篇:
Python zmq.Poller类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap