本文整理汇总了Python中support.MockTransport类的典型用法代码示例。如果您正苦于以下问题:Python MockTransport类的具体用法?Python MockTransport怎么用?Python MockTransport使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MockTransport类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_send_message_incremental
def test_send_message_incremental(self):
# Send a message byte by byte. The protocol should be able process it.
transport = MockTransport()
protocol = DbusProtocol(True, self.store_messages, 'foo')
transport.start(protocol)
authexchange = b'\0AUTH ANONYMOUS\r\nBEGIN\r\n'
for i in range(len(authexchange)):
protocol.data_received(authexchange[i:i+1])
auth = protocol._authenticator
self.assertTrue(auth.authenticationSucceeded())
message = txdbus.MethodCallMessage('/org/freedesktop/DBus', 'Hello',
interface='org.freedesktop.DBus', destination='org.freedesktop.DBus')
for i in range(len(message.rawMessage)):
protocol.data_received(message.rawMessage[i:i+1])
gruvi.sleep(0)
self.assertIsNone(protocol._error)
self.assertFalse(transport.closed)
self.assertEqual(len(self.messages), 0)
message = txdbus.MethodCallMessage('/my/path', 'Method')
for i in range(len(message.rawMessage)):
protocol.data_received(message.rawMessage[i:i+1])
gruvi.sleep(0)
self.assertIsNone(protocol._error)
self.assertFalse(transport.closed)
self.assertEqual(len(self.messages), 1)
self.assertEqual(self.messages[0].path, '/my/path')
self.assertEqual(self.messages[0].member, 'Method')
self.assertEqual(self.protocols, [protocol])
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:28,代码来源:test_dbus.py
示例2: test_send_message
def test_send_message(self):
# After the "Hello" message, it should be possible to send other
# messages.
transport = MockTransport()
protocol = DbusProtocol(True, self.store_messages, 'foo')
transport.start(protocol)
protocol.data_received(b'\0AUTH ANONYMOUS\r\nBEGIN\r\n')
auth = protocol._authenticator
self.assertTrue(auth.authenticationSucceeded())
message = txdbus.MethodCallMessage('/org/freedesktop/DBus', 'Hello',
interface='org.freedesktop.DBus', destination='org.freedesktop.DBus')
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertIsNone(protocol._error)
self.assertFalse(transport.closed)
self.assertTrue(protocol._name_acquired)
self.assertEqual(len(self.messages), 0)
message = txdbus.MethodCallMessage('/my/path', 'Method')
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertIsNone(protocol._error)
self.assertFalse(transport.closed)
self.assertEqual(len(self.messages), 1)
self.assertEqual(self.messages[0].path, '/my/path')
self.assertEqual(self.messages[0].member, 'Method')
self.assertEqual(self.protocols, [protocol])
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:26,代码来源:test_dbus.py
示例3: test_writelines
def test_writelines(self):
# Test that writelines() works.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
protocol.stream.writelines([b'foo', b'bar'])
self.assertEqual(transport.buffer.getvalue(), b'foobar')
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:7,代码来源:test_stream.py
示例4: test_send_message_too_large
def test_send_message_too_large(self):
# Send a message that exceeds the maximum message size. The connection
# should be closed.
transport = MockTransport()
protocol = DbusProtocol(True, self.store_messages, 'foo')
transport.start(protocol)
protocol.data_received(b'\0AUTH ANONYMOUS\r\nBEGIN\r\n')
message = txdbus.MethodCallMessage('/org/freedesktop/DBus', 'Hello',
interface='org.freedesktop.DBus', destination='org.freedesktop.DBus')
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertTrue(protocol._name_acquired)
# Send a signal with a size equal to the high-water mark. This should work.
message = txdbus.SignalMessage('/my/path', 'Signal', 'my.iface',
signature='s', body=['x'*100])
msglen = len(message.rawMessage)
self.assertGreater(msglen, 100)
protocol.set_read_buffer_limits(msglen)
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertIsNone(protocol._error)
self.assertFalse(transport.closed)
self.assertEqual(len(self.messages), 1)
# Now send a signal with a size larger than the high-water mark. This should fail.
message = txdbus.SignalMessage('/my/path', 'Signal', 'my.iface',
signature='s', body=['x'*100])
msglen = len(message.rawMessage)
protocol.set_read_buffer_limits(msglen-1)
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertIsInstance(protocol._error, DbusError)
self.assertTrue(transport.closed)
self.assertEqual(len(self.messages), 1)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:33,代码来源:test_dbus.py
示例5: test_write_eof
def test_write_eof(self):
# Test that write_eof() works.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
self.assertFalse(transport.eof)
protocol.stream.write_eof()
self.assertTrue(transport.eof)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:8,代码来源:test_stream.py
示例6: test_readline
def test_readline(self):
# Test that readline() works.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
protocol.data_received(b'foo\n')
stream = protocol.stream
self.assertEqual(stream.readline(), b'foo\n')
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:8,代码来源:test_stream.py
示例7: parse_request
def parse_request(self, *chunks):
# Parse the HTTP request made up of *chunks.
transport = MockTransport()
protocol = HttpProtocol(self.store_request, server_side=True)
transport.start(protocol)
for chunk in chunks:
protocol.data_received(chunk)
self.assertIsNone(protocol._error)
self.transport = transport
self.protocol = protocol
开发者ID:swegener,项目名称:gruvi,代码行数:10,代码来源:test_http.py
示例8: test_auth_missing_creds_byte
def test_auth_missing_creds_byte(self):
# The first thing a client should send to the server is a '\0' byte. If
# not, the server should close the connection.
transport = MockTransport()
protocol = DbusProtocol(True, None)
transport.start(protocol)
self.assertFalse(transport.closed)
protocol.data_received(b'\1')
self.assertIsInstance(protocol._error, DbusError)
self.assertTrue(transport.closed)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:10,代码来源:test_dbus.py
示例9: test_auth_non_ascii
def test_auth_non_ascii(self):
# After the '\0' byte, an authentiction phase happens. The
# authentication protocol is line based and all lines should be ascii.
transport = MockTransport()
protocol = DbusProtocol(True, None)
transport.start(protocol)
self.assertFalse(transport.closed)
protocol.data_received(b'\0\xff\r\n')
self.assertIsInstance(protocol._error, DbusError)
self.assertTrue(transport.closed)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:10,代码来源:test_dbus.py
示例10: test_auth_long_line
def test_auth_long_line(self):
# An authentication line should not exceed the maximum line size.
transport = MockTransport()
protocol = DbusProtocol(True, None, 'foo')
protocol.max_line_size = 5
transport.start(protocol)
self.assertFalse(transport.closed)
protocol.data_received(b'\0AUTH ANONYMOUS\r\n')
self.assertIsInstance(protocol._error, DbusError)
self.assertTrue(transport.closed)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:10,代码来源:test_dbus.py
示例11: test_iter
def test_iter(self):
# Ensure that iterating over a stream protocol produces lines.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
protocol.data_received(b'foo\nbar\n')
protocol.eof_received()
it = iter(protocol.stream)
self.assertEqual(six.next(it), b'foo\n')
self.assertEqual(six.next(it), b'bar\n')
self.assertRaises(StopIteration, six.next, it)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:11,代码来源:test_stream.py
示例12: test_read_after_error
def test_read_after_error(self):
# Test that the buffer can be emptied after an error occurs.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
protocol.data_received(b'foobar')
protocol.connection_lost(RuntimeError)
stream = protocol.stream
self.assertEqual(stream.read(3), b'foo')
self.assertEqual(stream.read(3), b'bar')
self.assertEqual(stream.read(), b'')
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:11,代码来源:test_stream.py
示例13: parse_response
def parse_response(self, *chunks, **kwargs):
# Parse the HTTP resposne made up of *chunks.
transport = MockTransport()
protocol = HttpProtocol()
transport.start(protocol)
methods = kwargs.get('methods', [])
if methods:
protocol._requests = methods
for chunk in chunks:
protocol.data_received(chunk)
self.assertIsNone(protocol._error)
self.transport = transport
self.protocol = protocol
开发者ID:swegener,项目名称:gruvi,代码行数:13,代码来源:test_http.py
示例14: test_missing_hello
def test_missing_hello(self):
# After authentication, the first message should be a "Hello".
# Otherwise, the server should close the connection.
transport = MockTransport()
protocol = DbusProtocol(True, self.store_messages, 'foo')
transport.start(protocol)
protocol.data_received(b'\0AUTH ANONYMOUS\r\nBEGIN\r\n')
message = txdbus.MethodCallMessage('/my/path', 'Method')
auth = protocol._authenticator
self.assertTrue(auth.authenticationSucceeded())
protocol.data_received(message.rawMessage)
self.assertIsInstance(protocol._error, DbusError)
self.assertTrue(transport.closed)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:13,代码来源:test_dbus.py
示例15: test_auth_ok
def test_auth_ok(self):
# Test anonymous authenication. Ensure that the server GUID is
# correctly sent back.
transport = MockTransport()
protocol = DbusProtocol(True, None, 'foo')
transport.start(protocol)
protocol.data_received(b'\0AUTH ANONYMOUS\r\nBEGIN\r\n')
buf = transport.buffer.getvalue()
self.assertTrue(buf.startswith(b'OK foo'))
auth = protocol._authenticator
self.assertTrue(auth.authenticationSucceeded())
self.assertTrue(auth.getGUID(), 'foo')
self.assertFalse(transport.closed)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:13,代码来源:test_dbus.py
示例16: perf_parsing_speed
def perf_parsing_speed(self):
transport = MockTransport()
protocol = HttpProtocol()
transport.start(protocol)
r = b'HTTP/1.1 200 OK\r\nContent-Length: 10000\r\n\r\n'
r += b'x' * 10000
reqs = 4 * r
nbytes = 0
t0 = t1 = time.time()
while t1 - t0 < 1.0:
protocol.data_received(reqs)
del protocol._queue._heap[:]
nbytes += len(reqs)
t1 = time.time()
speed = nbytes / (t1 - t0) / (1024 * 1024)
self.add_result(speed)
开发者ID:geertj,项目名称:gruvi,代码行数:16,代码来源:perf_http.py
示例17: setUp
def setUp(self):
super(TestJsonRpcProtocol, self).setUp()
self.transport = MockTransport()
self.protocol = JsonRpcProtocol(self.message_handler)
self.transport.start(self.protocol)
self.messages = []
self.protocols = []
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:7,代码来源:test_jsonrpc.py
示例18: test_read_write_flow_control
def test_read_write_flow_control(self):
# Test the read and write flow control of a stream transport.
transport = MockTransport()
protocol = StreamProtocol()
transport.start(protocol)
protocol.stream.buffer.set_buffer_limits(100)
transport.set_write_buffer_limits(50)
def reader():
while True:
buf = protocol.stream.read(20)
if not buf:
break
protocol.stream.write(buf)
fib = gruvi.spawn(reader)
buf = b'x' * 20
interrupted = 0
for i in range(100):
protocol.data_received(buf)
if transport._reading:
continue
interrupted += 1
self.assertGreater(protocol.stream.buffer.get_buffer_size(), 0)
# Switch to the reader() fiber which will fill up the transport
# write buffer.
gruvi.sleep(0)
# The transport write buffer should be full but the protocol read
# buffer should still contain something.
self.assertGreater(protocol.stream.buffer.get_buffer_size(), 0)
self.assertFalse(transport._can_write.is_set())
# Drain write buffer and resume writing
transport.drain()
self.assertGreater(interrupted, 30)
fib.cancel()
gruvi.sleep(0)
开发者ID:swegener,项目名称:gruvi,代码行数:34,代码来源:test_stream.py
示例19: test_read_write_flow_control
def test_read_write_flow_control(self):
# Send a lot of messages filling up the protocol read buffer.
transport = MockTransport()
protocol = DbusProtocol(True, self.store_and_echo_messages)
transport.start(protocol)
protocol.data_received(b'\0AUTH ANONYMOUS\r\nBEGIN\r\n')
auth = protocol._authenticator
self.assertTrue(auth.authenticationSucceeded())
message = txdbus.MethodCallMessage('/org/freedesktop/DBus', 'Hello',
interface='org.freedesktop.DBus', destination='org.freedesktop.DBus')
protocol.data_received(message.rawMessage)
gruvi.sleep(0)
self.assertTrue(protocol._name_acquired)
interrupted = 0
message = txdbus.SignalMessage('/my/path', 'Signal', 'my.iface',
signature='s', body=['x'*100])
msglen = len(message.rawMessage)
protocol.set_read_buffer_limits(10*msglen)
transport.buffer.seek(0)
transport.buffer.truncate()
transport.set_write_buffer_limits(7*msglen)
for i in range(100):
# Fill up protocol read buffer
message = txdbus.SignalMessage('/my/path', 'Signal', 'my.iface',
signature='s', body=['x'*100])
protocol.data_received(message.rawMessage)
if protocol._reading:
continue
interrupted += 1
self.assertGreater(protocol._queue.qsize(), 0)
# Run the dispatcher to fill up the transport write buffer
gruvi.sleep(0)
# Now the write buffer is full and the read buffer still contains
# some entries because it is larger.
self.assertTrue(protocol._reading)
self.assertGreater(protocol._queue.qsize(), 0)
self.assertFalse(protocol._may_write.is_set())
# Drain write buffer and resume writing
transport.buffer.seek(0)
transport.buffer.truncate()
protocol.resume_writing()
# Should be interrupted > 10 times. The write buffer is the limiting factor
# not the read buffer.
self.assertGreater(interrupted, 10)
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:44,代码来源:test_dbus.py
示例20: TestJsonRpcProtocol
class TestJsonRpcProtocol(UnitTest):
def setUp(self):
super(TestJsonRpcProtocol, self).setUp()
self.transport = MockTransport()
self.protocol = JsonRpcProtocol(self.message_handler)
self.transport.start(self.protocol)
self.messages = []
self.protocols = []
def message_handler(self, message, transport, protocol):
self.messages.append(message)
self.protocols.append(protocol)
def get_messages(self):
# run dispatcher thread so that it calls our message handler
gruvi.sleep(0)
return self.messages
def test_simple(self):
m = b'{ "id": "1", "method": "foo" }'
proto = self.protocol
proto.data_received(m)
mm = self.get_messages()
self.assertEqual(len(mm), 1)
self.assertIsInstance(mm[0], dict)
self.assertEqual(mm[0], {'id': '1', 'method': 'foo'})
pp = self.protocols
self.assertEqual(len(pp), 1)
self.assertIs(pp[0], proto)
def test_multiple(self):
m = b'{ "id": "1", "method": "foo" }' \
b'{ "id": "2", "method": "bar" }'
proto = self.protocol
proto.data_received(m)
mm = self.get_messages()
self.assertEqual(len(mm), 2)
self.assertEqual(mm[0], {'id': '1', 'method': 'foo'})
self.assertEqual(mm[1], {'id': '2', 'method': 'bar'})
pp = self.protocols
self.assertEqual(len(pp), 2)
self.assertIs(pp[0], proto)
self.assertIs(pp[1], proto)
def test_whitespace(self):
m = b' { "id": "1", "method": "foo" }' \
b' { "id": "2", "method": "bar" }'
proto = self.protocol
proto.data_received(m)
mm = self.get_messages()
self.assertEqual(len(mm), 2)
self.assertEqual(mm[0], {'id': '1', 'method': 'foo'})
self.assertEqual(mm[1], {'id': '2', 'method': 'bar'})
def test_incremental(self):
m = b'{ "id": "1", "method": "foo" }'
proto = self.protocol
for i in range(len(m)-1):
proto.data_received(m[i:i+1])
self.assertEqual(self.get_messages(), [])
proto.data_received(m[-1:])
mm = self.get_messages()
self.assertEqual(len(mm), 1)
self.assertEqual(mm[0], {'id': '1', 'method': 'foo'})
def test_framing_error(self):
m = b'xxx'
proto = self.protocol
proto.data_received(m)
self.assertEqual(self.get_messages(), [])
self.assertIsInstance(proto._error, JsonRpcError)
def test_encoding_error(self):
m = b'{ xxx\xff }'
proto = self.protocol
proto.data_received(m)
self.assertEqual(self.get_messages(), [])
self.assertIsInstance(proto._error, JsonRpcError)
def test_illegal_json(self):
m = b'{ "xxxx" }'
proto = self.protocol
proto.data_received(m)
self.assertEqual(self.get_messages(), [])
self.assertIsInstance(proto._error, JsonRpcError)
def test_illegal_jsonrpc(self):
m = b'{ "xxxx": "yyyy" }'
proto = self.protocol
proto.data_received(m)
self.assertEqual(self.get_messages(), [])
self.assertIsInstance(proto._error, JsonRpcError)
def test_maximum_message_size_exceeded(self):
proto = self.protocol
proto.set_read_buffer_limits(100)
message = {'id': 1, 'method': 'foo', 'params': ['x'*100]}
self.assertEqual(jsonrpc.check_message(message), '1.0')
message = json.dumps(message).encode('utf8')
#.........这里部分代码省略.........
开发者ID:Ed-von-Schleck,项目名称:gruvi,代码行数:101,代码来源:test_jsonrpc.py
注:本文中的support.MockTransport类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论