本文整理汇总了Python中zmq.curve_keypair函数的典型用法代码示例。如果您正苦于以下问题:Python curve_keypair函数的具体用法?Python curve_keypair怎么用?Python curve_keypair使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了curve_keypair函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_curve
def test_curve(self):
"""test CURVE encryption"""
server = self.socket(zmq.DEALER)
server.identity = b"IDENT"
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
try:
server.curve_server = True
except zmq.ZMQError as e:
# will raise EINVAL if not linked against libsodium
if e.errno == zmq.EINVAL:
raise SkipTest("CURVE unsupported")
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
server.curve_secretkey = server_secret
server.curve_publickey = server_public
client.curve_serverkey = server_public
client.curve_publickey = client_public
client.curve_secretkey = client_secret
self.assertEqual(server.mechanism, zmq.CURVE)
self.assertEqual(client.mechanism, zmq.CURVE)
self.assertEqual(server.get(zmq.CURVE_SERVER), True)
self.assertEqual(client.get(zmq.CURVE_SERVER), False)
self.start_zap()
iface = "tcp://127.0.0.1"
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
self.bounce(server, client)
self.stop_zap()
开发者ID:hatbro,项目名称:pyzmq,代码行数:35,代码来源:test_security.py
示例2: _zmq_has_curve
def _zmq_has_curve():
"""
Return whether the current ZMQ has support for auth and CurveZMQ security.
:rtype: bool
Version notes:
`zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
Requires libzmq (>= 4.0) to have been linked with libsodium.
`zmq.auth` module is new in version 14.1
`zmq.has()` is new in version 14.1, new in version libzmq-4.1.
"""
zmq_version = zmq.zmq_version_info()
pyzmq_version = zmq.pyzmq_version_info()
if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
return zmq.has('curve')
if pyzmq_version < (14, 1, 0):
return False
if zmq_version < (4, 0):
# security is new in libzmq 4.0
return False
try:
zmq.curve_keypair()
except zmq.error.ZMQError:
# security requires libzmq to be linked against libsodium
return False
return True
开发者ID:PaixuAabuizia,项目名称:bitmask_client,代码行数:32,代码来源:utils.py
示例3: setUp
def setUp(self):
if zmq.zmq_version_info() < (4,0):
raise SkipTest("security is new in libzmq 4.0")
try:
zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("security requires libzmq to be built with CURVE support")
super(TestSecurity, self).setUp()
开发者ID:326029212,项目名称:pyzmq,代码行数:8,代码来源:test_security.py
示例4: setUp
def setUp(self):
if zmq.zmq_version_info() < (4, 0):
raise SkipTest("security is new in libzmq 4.0")
try:
zmq.curve_keypair()
except zmq.ZMQError:
raise SkipTest("security requires libzmq to be linked against libsodium")
super(TestSecurity, self).setUp()
开发者ID:hatbro,项目名称:pyzmq,代码行数:8,代码来源:test_security.py
示例5: test_untrusted_curve_with_allowed_password
def test_untrusted_curve_with_allowed_password(self):
from pseud import Client, Server
from pseud.utils import register_rpc
from pseud._tornado import async_sleep
client_id = b'john'
server_id = b'server'
endpoint = b'tcp://127.0.0.1:8999'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
user_id=client_id,
password=password,
io_loop=self.io_loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
io_loop=self.io_loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password
yield server.start()
yield client.start()
register_rpc(name='string.lower')(str.lower)
future = client.string.lower('FOO')
future2 = client.string.lower('FOO_JJ')
yield async_sleep(self.io_loop, .01)
future3 = server.send_to(client_id).string.lower('ABC')
result = yield future
result2 = yield future2
result3 = yield future3
assert result == 'foo'
assert result2 == 'foo_jj'
assert result3 == 'abc'
server.stop()
client.stop()
开发者ID:fahhem,项目名称:pseud,代码行数:53,代码来源:test_auth.py
示例6: test_untrusted_curve_with_allowed_password_and_client_disconnect
def test_untrusted_curve_with_allowed_password_and_client_disconnect(self):
from pseud import Client, Server
from pseud._tornado import async_sleep
client_id = b'john'
server_id = b'server'
endpoint = b'tcp://127.0.0.1:8999'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
user_id=client_id,
password=password,
timeout=1,
io_loop=self.io_loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
io_loop=self.io_loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password
yield server.start()
yield client.start()
server.register_rpc(name='string.lower')(str.lower)
result = yield client.string.lower('FOO')
assert result == 'foo'
# Simulate disconnection and reconnection with new identity
client.disconnect(endpoint)
client.connect(endpoint)
yield async_sleep(self.io_loop, .15)
result = yield client.string.lower('ABC')
assert result == 'abc'
server.stop()
client.stop()
开发者ID:fahhem,项目名称:pseud,代码行数:51,代码来源:test_auth.py
示例7: configure
def configure(self):
self.rpc.socket.curve_publickey = self.rpc.public_key
self.rpc.socket.curve_secretkey = self.rpc.secret_key
self.rpc.socket.curve_server = True
assert self.rpc.socket.mechanism == zmq.CURVE
assert self.rpc.socket.get(zmq.CURVE_SERVER)
self.zap_socket = zap_socket = self.rpc.context.socket(zmq.ROUTER)
zap_socket.linger = 1
zap_socket.bind(b'inproc://zeromq.zap.01')
self.reader = self.rpc.read_forever(zap_socket,
self._zap_handler,
copy=True)
self.known_identities = {b'bob': zmq.curve_keypair(),
b'alice': zmq.curve_keypair()}
开发者ID:lib1256,项目名称:pseud,代码行数:14,代码来源:auth.py
示例8: test_trusted_curve_with_wrong_peer_public_key
async def test_trusted_curve_with_wrong_peer_public_key(
loop, unused_tcp_port_factory):
from pseud import Client, Server
from pseud.utils import register_rpc
server_id = b'server'
port = unused_tcp_port_factory()
endpoint = f'tcp://127.0.0.1:{port}'
server_public, server_secret = zmq.curve_keypair()
server = Server(server_id, security_plugin='trusted_curve',
public_key=server_public,
secret_key=server_secret,
loop=loop)
server.bind(endpoint)
alice_public, alice_secret = \
server.auth_backend.known_identities[b'alice']
client = Client(server_id,
user_id=b'alice',
security_plugin='trusted_curve',
public_key=alice_public,
secret_key=alice_secret,
peer_public_key=z85.encode(b'R' * 32),
timeout=.5,
loop=loop)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
register_rpc(name='string.lower')(str.lower)
async with server, client:
with pytest.raises(asyncio.TimeoutError):
await client.string.lower('BAR')
开发者ID:ezeep,项目名称:pseud,代码行数:34,代码来源:test_auth.py
示例9: test_trusted_curve
async def test_trusted_curve(loop, unused_tcp_port,
trusted_curve_auth_backend):
from pseud import Client, Server
from pseud.utils import register_rpc
server_id = b'server'
endpoint = f'tcp://127.0.0.1:{unused_tcp_port}'
server_public, server_secret = zmq.curve_keypair()
security_plugin = 'trusted_curve'
server = Server(server_id, security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
loop=loop)
server.bind(endpoint)
bob_public, bob_secret = server.auth_backend.known_identities[b'bob']
client = Client(server_id,
user_id=b'bob',
security_plugin=security_plugin,
public_key=bob_public,
secret_key=bob_secret,
peer_public_key=server_public,
loop=loop)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
register_rpc(name='string.lower')(str.lower)
async with server, client:
result = await client.string.lower('FOO')
assert result == 'foo'
开发者ID:ezeep,项目名称:pseud,代码行数:33,代码来源:test_auth.py
示例10: __init__
def __init__(self):
generate_zmq_certificates_if_needed()
self._socket = None
# initialize ZMQ stuff:
context = zmq.Context()
logger.debug("Connecting to server...")
socket = context.socket(zmq.REQ)
if flags.ZMQ_HAS_CURVE:
# public, secret = zmq.curve_keypair()
client_keys = zmq.curve_keypair()
socket.curve_publickey = client_keys[0]
socket.curve_secretkey = client_keys[1]
# The client must know the server's public key to make a CURVE
# connection.
public, _ = get_backend_certificates()
socket.curve_serverkey = public
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0) # Terminate early
socket.connect(self.SERVER)
self._socket = socket
self._ping_at = 0
self.online = False
self._call_queue = Queue.Queue()
self._worker_caller = threading.Thread(target=self._worker)
开发者ID:EvyW,项目名称:bitmask_client,代码行数:31,代码来源:backend_proxy.py
示例11: run
def run(user_params):
if len(user_params) < 4:
print_usage()
return -1
tcp_endpoint = user_params[1]
server_key = user_params[2]
context = zmq.Context.instance()
dealer = context.socket(zmq.DEALER)
# random key as the server doesn't check those
client_public, client_secret = zmq.curve_keypair()
dealer.curve_serverkey = server_key
dealer.curve_publickey = client_public
dealer.curve_secretkey = client_secret
connect_str = "tcp://" + str(tcp_endpoint)
dealer.connect(connect_str)
print "Connected to " + connect_str
ch = CommandHandler(dealer, user_params
);
ret = ch.handle_command(user_params[3])
time.sleep(1)
context.destroy(linger=5000)
return ret
开发者ID:EvdoDima,项目名称:my_leosac,代码行数:27,代码来源:remote_control.py
示例12: _init_zmq
def _init_zmq(self):
"""
Configure the zmq components and connection.
"""
logger.debug("Setting up ZMQ connection to server...")
context = zmq.Context()
socket = context.socket(zmq.REQ)
# we use zmq's eventloop in order to asynchronously send requests
loop = ioloop.ZMQIOLoop.current()
self._stream = zmqstream.ZMQStream(socket, loop)
if flags.ZMQ_HAS_CURVE:
# public, secret = zmq.curve_keypair()
client_keys = zmq.curve_keypair()
socket.curve_publickey = client_keys[0]
socket.curve_secretkey = client_keys[1]
# The client must know the server's public key to make a CURVE
# connection.
public, _ = get_backend_certificates()
socket.curve_serverkey = public
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0) # Terminate early
self._stream.on_recv(self._on_recv)
开发者ID:parmegv,项目名称:bitmask_client,代码行数:27,代码来源:backend_proxy.py
示例13: bind
def bind(self, sock, bind_fn=None):
'''Extended zmq.Socket.bind() to include options in the address.'''
if not self.domain:
raise ValueError('Address domain must be set')
sock.zap_domain = self.domain or ''
if self.identity:
sock.identity = self.identity
elif not sock.identity:
sock.identity = self.identity = bytes(uuid.uuid4())
sock.ipv6 = self.ipv6 or False
if self.server == 'CURVE':
if not self.secretkey:
raise ValueError('CURVE server used without secretkey')
sock.curve_server = True
sock.curve_secretkey = self.secretkey
elif self.server == 'PLAIN':
sock.plain_server = True
else:
sock.curve_server = False
sock.plain_server = False
if self.serverkey:
sock.curve_serverkey = self.serverkey
if not (self.publickey and self.secretkey):
self.publickey, self.secretkey = curve_keypair()
sock.curve_secretkey = self.secretkey
sock.curve_publickey = self.publickey
elif self.username:
sock.plain_username = self.username
sock.plain_password = self.password or b''
(bind_fn or sock.bind)(self.base)
self.base = sock.last_endpoint
开发者ID:NREL,项目名称:volttron,代码行数:31,代码来源:socket.py
示例14: __init__
def __init__(self):
"""
Initialize the ZMQ socket to talk to the signaling server.
"""
context = zmq.Context()
logger.debug("Connecting to signaling server...")
socket = context.socket(zmq.REQ)
# public, secret = zmq.curve_keypair()
client_keys = zmq.curve_keypair()
socket.curve_publickey = client_keys[0]
socket.curve_secretkey = client_keys[1]
# The client must know the server's public key to make a CURVE
# connection.
public, _ = get_frontend_certificates()
socket.curve_serverkey = public
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.connect(self.SERVER)
self._socket = socket
self._signal_queue = Queue.Queue()
self._do_work = threading.Event() # used to stop the worker thread.
self._worker_signaler = threading.Thread(target=self._worker)
开发者ID:bwagnerr,项目名称:bitmask_client,代码行数:26,代码来源:signaler.py
示例15: __init__
def __init__(self, url):
context = get_context()
import caffeine.keytools
(zeromq_url, z85_public, z85_private,
z85_server) = caffeine.keytools.parseURL(url)
if z85_server:
# In this case, we enable encryption and assume we're talking to a
# ROUTER
self.socket = context.socket(zmq.REQ)
client_public, client_secret = zmq.curve_keypair()
self.socket.curve_publickey = z85_public
self.socket.curve_secretkey = z85_private
self.socket.curve_serverkey = z85_server
self.burned_ready = True # not required for router
self.router_style_messages = True
self.socket.connect(zeromq_url)
else:
# In this case, we handle direct mode
self.burned_ready = False
self.socket = context.socket(zmq.REP)
self.router_style_messages = False
self.socket.bind(zeromq_url)
print(zeromq_url, z85_public, z85_private, z85_server)
print("client connecting to URL %s" % url)
开发者ID:wittedhaddock,项目名称:caffeine,代码行数:27,代码来源:worker.py
示例16: test_untrusted_curve_with_allowed_password_and_client_disconnect
async def test_untrusted_curve_with_allowed_password_and_client_disconnect(
loop, unused_tcp_port):
from pseud import Client, Server
client_id = b'john'
server_id = b'server'
endpoint = f'tcp://127.0.0.1:{unused_tcp_port}'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
user_id=client_id,
password=password,
timeout=1,
loop=loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
loop=loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password
server.register_rpc(name='string.lower')(str.lower)
async with server, client:
result = await client.string.lower('FOO')
assert result == 'foo'
# Simulate disconnection and reconnection with new identity
client.disconnect(endpoint)
client.connect(endpoint)
await asyncio.sleep(.1)
result = await client.string.lower('ABC')
assert result == 'abc'
开发者ID:ezeep,项目名称:pseud,代码行数:47,代码来源:test_auth.py
示例17: test_untrusted_curve_with_wrong_password
def test_untrusted_curve_with_wrong_password(self):
from pseud import Client, Server
from pseud.interfaces import UnauthorizedError
from pseud.utils import register_rpc
client_id = b'john'
server_id = b'server'
endpoint = b'tcp://127.0.0.1:8998'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
user_id=client_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
password=password,
io_loop=self.io_loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
io_loop=self.io_loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password + b'Looser'
yield server.start()
yield client.start()
register_rpc(name='string.lower')(str.lower)
future = client.string.lower(b'IMSCREAMING')
with pytest.raises(UnauthorizedError):
yield future
server.stop()
client.stop()
开发者ID:fahhem,项目名称:pseud,代码行数:46,代码来源:test_auth.py
示例18: test_untrusted_curve_with_allowed_password
async def test_untrusted_curve_with_allowed_password(
loop, unused_tcp_port, untrusted_curve_auth_backend):
from pseud import Client, Server
from pseud.utils import register_rpc
client_id = b'john'
server_id = b'server'
endpoint = f'tcp://127.0.0.1:{unused_tcp_port}'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
user_id=client_id,
password=password,
loop=loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
loop=loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password
register_rpc(name='string.lower')(str.lower)
async with server, client:
result = await client.string.lower('FOO')
result2 = await client.string.lower('FOO_JJ')
result3 = await server.send_to(client_id).string.lower('ABC')
assert result == 'foo'
assert result2 == 'foo_jj'
assert result3 == 'abc'
开发者ID:ezeep,项目名称:pseud,代码行数:45,代码来源:test_auth.py
示例19: test_untrusted_curve_with_wrong_password
async def test_untrusted_curve_with_wrong_password(loop, unused_tcp_port):
from pseud import Client, Server
from pseud.interfaces import UnauthorizedError
from pseud.utils import register_rpc
client_id = b'john'
server_id = b'server'
endpoint = f'tcp://127.0.0.1:{unused_tcp_port}'
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
security_plugin = 'untrusted_curve'
password = b's3cret!'
client = Client(server_id,
user_id=client_id,
security_plugin=security_plugin,
public_key=client_public,
secret_key=client_secret,
peer_public_key=server_public,
password=password,
loop=loop)
server = Server(server_id,
security_plugin=security_plugin,
public_key=server_public,
secret_key=server_secret,
loop=loop)
server.bind(endpoint)
client.connect(endpoint)
assert server.socket.mechanism == zmq.CURVE
assert client.socket.mechanism == zmq.CURVE
# configure manually authentication backend
server.auth_backend.user_map[client_id] = password + b'Looser'
register_rpc(name='string.lower')(str.lower)
async with server, client:
with pytest.raises(UnauthorizedError):
await client.string.lower(b'IMSCREAMING')
开发者ID:ezeep,项目名称:pseud,代码行数:41,代码来源:test_auth.py
示例20: test_roundtrip
def test_roundtrip(self):
import zmq
import sys
sys.path.append("..")
import caffeine
context = zmq.Context()
socket = context.socket(zmq.REQ)
client_public, client_secret = zmq.curve_keypair()
socket.curve_publickey = client_public
socket.curve_secretkey = client_secret
socket.curve_serverkey = caffeine.well_known_public_key
socket.connect("tcp://localhost:55555")
socket.send_multipart([b'test1'])
msg = socket.recv_multipart()
print(msg)
self.assertEquals(msg[1], b"worker says hello")
开发者ID:wittedhaddock,项目名称:caffeine,代码行数:16,代码来源:testRouter.py
注:本文中的zmq.curve_keypair函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论