本文整理汇总了Python中six.moves.mock.call函数的典型用法代码示例。如果您正苦于以下问题:Python call函数的具体用法?Python call怎么用?Python call使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了call函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_two_topics
def test_two_topics(self):
transport = oslo_messaging.get_transport(self.conf, url="fake:")
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [oslo_messaging.Target(topic="topic1"), oslo_messaging.Target(topic="topic2")]
listener_thread = self._setup_listener(transport, [endpoint], targets=targets)
notifier = self._setup_notifier(transport, topic="topic1")
notifier.info({"ctxt": "1"}, "an_event.start1", "test")
notifier = self._setup_notifier(transport, topic="topic2")
notifier.info({"ctxt": "2"}, "an_event.start2", "test")
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls(
[
mock.call(
{"ctxt": "1"},
"testpublisher",
"an_event.start1",
"test",
{"timestamp": mock.ANY, "message_id": mock.ANY},
),
mock.call(
{"ctxt": "2"},
"testpublisher",
"an_event.start2",
"test",
{"timestamp": mock.ANY, "message_id": mock.ANY},
),
],
any_order=True,
)
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:34,代码来源:test_listener.py
示例2: test_listen_and_direct_send
def test_listen_and_direct_send(self):
target = oslo_messaging.Target(exchange="exchange_test",
topic="topic_test",
server="server_test")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
session.receiver.side_effect = [mock.Mock(), mock.Mock(),
mock.Mock()]
listener = self.driver.listen(target)
listener.conn.direct_send("msg_id", {})
self.assertEqual(3, len(listener.conn.consumers))
expected_calls = [
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test')),
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test.server_test')),
mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
]
session.receiver.assert_has_calls(expected_calls)
session.sender.assert_called_with(
AddressNodeMatcher("amq.direct/msg_id"))
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:26,代码来源:test_impl_qpid.py
示例3: test_batch_timeout
def test_batch_timeout(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint],
batch=(5, 1))
notifier = self._setup_notifier(transport)
for i in six.moves.range(12):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(3)
self.assertFalse(listener_thread.stop())
messages = [dict(ctxt={},
publisher_id='testpublisher',
event_type='an_event.start',
payload='test message',
metadata={'message_id': mock.ANY,
'timestamp': mock.ANY})]
endpoint.info.assert_has_calls([mock.call(messages * 5),
mock.call(messages * 5),
mock.call(messages * 2)])
开发者ID:shahar-stratoscale,项目名称:oslo.messaging,代码行数:25,代码来源:test_listener.py
示例4: test_two_topics
def test_two_topics(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [oslo_messaging.Target(topic="topic1"),
oslo_messaging.Target(topic="topic2")]
listener_thread = self._setup_listener(transport, [endpoint],
targets=targets)
notifier = self._setup_notifier(transport, topic='topic1')
notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
notifier = self._setup_notifier(transport, topic='topic2')
notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls([
mock.call({'ctxt': '1'}, 'testpublisher',
'an_event.start1', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher',
'an_event.start2', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
any_order=True)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:25,代码来源:test_listener.py
示例5: test_requeue
def test_requeue(self):
transport = oslo_messaging.get_transport(self.conf, url="fake:")
endpoint = mock.Mock()
endpoint.info = mock.Mock()
def side_effect_requeue(*args, **kwargs):
if endpoint.info.call_count == 1:
return oslo_messaging.NotificationResult.REQUEUE
return oslo_messaging.NotificationResult.HANDLED
endpoint.info.side_effect = side_effect_requeue
listener_thread = self._setup_listener(transport, [endpoint])
notifier = self._setup_notifier(transport)
notifier.info({}, "an_event.start", "test")
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls(
[
mock.call(
{}, "testpublisher", "an_event.start", "test", {"timestamp": mock.ANY, "message_id": mock.ANY}
),
mock.call(
{}, "testpublisher", "an_event.start", "test", {"timestamp": mock.ANY, "message_id": mock.ANY}
),
]
)
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:28,代码来源:test_listener.py
示例6: test_handle_message_callback
def test_handle_message_callback(transport, protocol, dispatcher):
server = RPCServer(transport, protocol, dispatcher)
server.trace = Mock(return_value=None)
server.receive_one_message()
assert server.trace.call_args_list == [call('-->', CONTEXT, RECMSG), call('<--', CONTEXT, SERMSG)]
server.trace.assert_called()
开发者ID:litnimax,项目名称:tinyrpc,代码行数:7,代码来源:test_server.py
示例7: test_two_exchanges
def test_two_exchanges(self):
transport = oslo_messaging.get_transport(self.conf, url="fake:")
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [
oslo_messaging.Target(topic="topic", exchange="exchange1"),
oslo_messaging.Target(topic="topic", exchange="exchange2"),
]
listener_thread = self._setup_listener(transport, [endpoint], targets=targets)
notifier = self._setup_notifier(transport, topic="topic")
def mock_notifier_exchange(name):
def side_effect(target, ctxt, message, version, retry):
target.exchange = name
return transport._driver.send_notification(target, ctxt, message, version, retry=retry)
transport._send_notification = mock.MagicMock(side_effect=side_effect)
notifier.info({"ctxt": "0"}, "an_event.start", "test message default exchange")
mock_notifier_exchange("exchange1")
notifier.info({"ctxt": "1"}, "an_event.start", "test message exchange1")
mock_notifier_exchange("exchange2")
notifier.info({"ctxt": "2"}, "an_event.start", "test message exchange2")
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls(
[
mock.call(
{"ctxt": "1"},
"testpublisher",
"an_event.start",
"test message exchange1",
{"timestamp": mock.ANY, "message_id": mock.ANY},
),
mock.call(
{"ctxt": "2"},
"testpublisher",
"an_event.start",
"test message exchange2",
{"timestamp": mock.ANY, "message_id": mock.ANY},
),
],
any_order=True,
)
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:48,代码来源:test_listener.py
示例8: test_constructor
def test_constructor(self, warn):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy
warnings.simplefilter("always", FutureWarning)
server = oslo_messaging.get_rpc_server(transport,
target,
endpoints,
serializer=serializer,
access_policy=access_policy)
self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport)
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('blocking', server.executor_type)
self.assertEqual([
mock.call("blocking executor is deprecated. Executor default will "
"be removed. Use explicitly threading or eventlet "
"instead in version 'pike' and will be removed in "
"version 'rocky'",
category=FutureWarning, stacklevel=3)
], warn.mock_calls)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:26,代码来源:test_server.py
示例9: _do_test_heartbeat_sent
def _do_test_heartbeat_sent(self, fake_ensure_connection,
fake_heartbeat_support, fake_heartbeat,
fake_logger, heartbeat_side_effect=None,
info=None):
event = threading.Event()
def heartbeat_check(rate=2):
event.set()
if heartbeat_side_effect:
raise heartbeat_side_effect
fake_heartbeat.side_effect = heartbeat_check
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
conn = transport._driver._get_connection()
conn.ensure(method=lambda: True)
event.wait()
conn._heartbeat_stop()
# check heartbeat have been called
self.assertLess(0, fake_heartbeat.call_count)
if not heartbeat_side_effect:
self.assertEqual(1, fake_ensure_connection.call_count)
self.assertEqual(2, fake_logger.debug.call_count)
self.assertEqual(0, fake_logger.info.call_count)
else:
self.assertEqual(2, fake_ensure_connection.call_count)
self.assertEqual(2, fake_logger.debug.call_count)
self.assertEqual(1, fake_logger.info.call_count)
self.assertIn(mock.call(info, mock.ANY),
fake_logger.info.mock_calls)
开发者ID:vito0302,项目名称:oslo.messaging,代码行数:35,代码来源:test_impl_rabbit.py
示例10: mocked_endpoint_call
def mocked_endpoint_call(i):
return mock.call(
{"ctxt": "%d" % i},
"testpublisher",
"an_event.start",
"test message%d" % i,
{"timestamp": mock.ANY, "message_id": mock.ANY},
)
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:8,代码来源:test_listener.py
示例11: test_two_exchanges
def test_two_exchanges(self):
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
targets = [oslo_messaging.Target(topic="topic",
exchange="exchange1"),
oslo_messaging.Target(topic="topic",
exchange="exchange2")]
listener_thread = self._setup_listener(transport, [endpoint],
targets=targets)
notifier = self._setup_notifier(transport, topic="topic")
def mock_notifier_exchange(name):
def side_effect(target, ctxt, message, version, retry):
target.exchange = name
return transport._driver.send_notification(target, ctxt,
message, version,
retry=retry)
transport._send_notification = mock.MagicMock(
side_effect=side_effect)
notifier.info({'ctxt': '0'},
'an_event.start', 'test message default exchange')
mock_notifier_exchange('exchange1')
notifier.info({'ctxt': '1'},
'an_event.start', 'test message exchange1')
mock_notifier_exchange('exchange2')
notifier.info({'ctxt': '2'},
'an_event.start', 'test message exchange2')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
endpoint.info.assert_has_calls([
mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
'test message exchange1',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
'test message exchange2',
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
any_order=True)
开发者ID:apporc,项目名称:oslo.messaging,代码行数:44,代码来源:test_listener.py
示例12: test_reconnect_order
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers,
group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
connection = qpid_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
# reached
for _ in range(brokers_count):
connection.reconnect()
expected = []
for broker in brokers:
expected.extend([mock.call("%s:5672" % broker),
mock.call().open(),
mock.call().session(),
mock.call().opened(),
mock.call().opened().__nonzero__(),
mock.call().close()])
conn_mock.assert_has_calls(expected, any_order=True)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:29,代码来源:test_impl_qpid.py
示例13: test_paths
def test_paths(self, mocker):
"""Ensure that the specified paths are added to the path."""
paths = ['path1', 'path2', 'path3']
eval_mock = mocker.patch('matl_online.octave.OctaveSession.eval')
session = OctaveSession(paths=paths)
assert session.octaverc is None
assert session.paths == paths
expected_calls = [mock.call('addpath("''%s''")' % path) for path in paths]
eval_mock.assert_has_calls(expected_calls)
开发者ID:suever,项目名称:MATL-Online,代码行数:13,代码来源:test_octave.py
示例14: test_call_serializer
def test_call_serializer(self):
self.config(rpc_response_timeout=None)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer()
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
serializer=serializer)
transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs['retry'] = None
if self.call:
kwargs['call_monitor_timeout'] = None
transport._send.return_value = self.retval
serializer.serialize_entity = mock.Mock()
serializer.deserialize_entity = mock.Mock()
serializer.serialize_context = mock.Mock()
def _stub(ctxt, arg):
return 's' + arg
msg = dict(method='foo', args=dict())
for k, v in self.args.items():
msg['args'][k] = 's' + v
serializer.serialize_entity.side_effect = _stub
if self.call:
serializer.deserialize_entity.return_value = 'd' + self.retval
serializer.serialize_context.return_value = dict(user='alice')
method = client.call if self.call else client.cast
retval = method(self.ctxt, 'foo', **self.args)
if self.retval is not None:
self.assertEqual('d' + self.retval, retval)
transport._send.assert_called_once_with(oslo_messaging.Target(),
dict(user='alice'),
msg,
**kwargs)
expected_calls = [mock.call(self.ctxt, arg) for arg in self.args]
self.assertEqual(expected_calls,
serializer.serialize_entity.mock_calls)
if self.call:
serializer.deserialize_entity.assert_called_once_with(self.ctxt,
self.retval)
serializer.serialize_context.assert_called_once_with(self.ctxt)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:51,代码来源:test_client.py
示例15: test_constructor_without_explicit_RPCAccessPolicy
def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
warnings.simplefilter("always", FutureWarning)
oslo_messaging.get_rpc_server(transport, target,
endpoints, serializer=serializer)
self.assertEqual([
mock.call("blocking executor is deprecated. Executor default will "
"be removed. Use explicitly threading or eventlet "
"instead in version 'pike' and will be removed in "
"version 'rocky'",
category=FutureWarning, stacklevel=3)
], warn.mock_calls)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:16,代码来源:test_server.py
示例16: test_get_transport
def test_get_transport(self, fake_logger):
self.config(rpc_backend=self.rpc_backend,
control_exchange=self.control_exchange,
transport_url=self.transport_url)
driver.DriverManager = mock.Mock()
invoke_args = [self.conf,
oslo_messaging.TransportURL.parse(self.conf,
self.expect['url'])]
invoke_kwds = dict(default_exchange=self.expect['exchange'],
allowed_remote_exmods=self.expect['allowed'])
drvr = _FakeDriver(self.conf)
driver.DriverManager.return_value = _FakeManager(drvr)
kwargs = dict(url=self.url)
if self.allowed is not None:
kwargs['allowed_remote_exmods'] = self.allowed
if self.aliases is not None:
kwargs['aliases'] = self.aliases
transport_ = oslo_messaging.get_transport(self.conf, **kwargs)
if self.aliases is not None:
self.assertEqual(
[mock.call('legacy "rpc_backend" is deprecated, '
'"testfoo" must be replaced by '
'"%s"' % self.aliases.get('testfoo'))],
fake_logger.warning.mock_calls
)
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
self.assertTrue(isinstance(transport_, transport.RPCTransport))
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
self.expect['backend'],
invoke_on_load=True,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:42,代码来源:test_transport.py
示例17: test_serializer
def test_serializer(self):
endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer()
dispatcher = oslo_messaging.RPCDispatcher([endpoint], serializer)
endpoint.foo = mock.Mock()
args = dict([(k, 'd' + v) for k, v in self.args.items()])
endpoint.foo.return_value = self.retval
serializer.serialize_entity = mock.Mock()
serializer.deserialize_entity = mock.Mock()
serializer.deserialize_context = mock.Mock()
serializer.deserialize_context.return_value = self.dctxt
expected_side_effect = ['d' + arg for arg in self.args]
serializer.deserialize_entity.side_effect = expected_side_effect
serializer.serialize_entity.return_value = None
if self.retval:
serializer.serialize_entity.return_value = 's' + self.retval
incoming = mock.Mock()
incoming.ctxt = self.ctxt
incoming.message = dict(method='foo', args=self.args)
incoming.client_timeout = 0
retval = dispatcher.dispatch(incoming)
if self.retval is not None:
self.assertEqual('s' + self.retval, retval)
endpoint.foo.assert_called_once_with(self.dctxt, **args)
serializer.deserialize_context.assert_called_once_with(self.ctxt)
expected_calls = [mock.call(self.dctxt, arg) for arg in self.args]
self.assertEqual(expected_calls,
serializer.deserialize_entity.mock_calls)
serializer.serialize_entity.assert_called_once_with(self.dctxt,
self.retval)
开发者ID:openstack,项目名称:oslo.messaging,代码行数:40,代码来源:test_dispatcher.py
示例18: test_batch_size_exception_path
def test_batch_size_exception_path(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.side_effect = [None, Exception('boom!')]
listener_thread = self._setup_listener(transport, [endpoint],
batch=(5, None))
notifier = self._setup_notifier(transport)
for i in six.moves.range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
messages = [dict(ctxt={},
publisher_id='testpublisher',
event_type='an_event.start',
payload='test message',
metadata={'message_id': mock.ANY,
'timestamp': mock.ANY})]
endpoint.info.assert_has_calls([mock.call(messages * 5)])
开发者ID:shahar-stratoscale,项目名称:oslo.messaging,代码行数:23,代码来源:test_listener.py
示例19: mocked_endpoint_call
def mocked_endpoint_call(i):
return mock.call({'ctxt': '%d' % i}, 'testpublisher',
'an_event.start', 'test message%d' % i,
{'timestamp': mock.ANY, 'message_id': mock.ANY})
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:4,代码来源:test_listener.py
示例20: test_notifier
def test_notifier(self, mock_utcnow):
drivers = []
if self.v1:
drivers.append('messaging')
if self.v2:
drivers.append('messagingv2')
self.config(driver=drivers,
topics=self.topics,
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
if hasattr(self, 'ctor_pub_id'):
notifier = oslo_messaging.Notifier(transport,
publisher_id=self.ctor_pub_id)
else:
notifier = oslo_messaging.Notifier(transport)
prepare_kwds = {}
if hasattr(self, 'retry'):
prepare_kwds['retry'] = self.retry
if hasattr(self, 'prep_pub_id'):
prepare_kwds['publisher_id'] = self.prep_pub_id
if prepare_kwds:
notifier = notifier.prepare(**prepare_kwds)
transport._send_notification = mock.Mock()
message_id = uuid.uuid4()
uuid.uuid4 = mock.Mock(return_value=message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
message = {
'message_id': str(message_id),
'publisher_id': self.expected_pub_id,
'event_type': 'test.notify',
'priority': self.priority.upper(),
'payload': self.payload,
'timestamp': str(timeutils.utcnow()),
}
sends = []
if self.v1:
sends.append(dict(version=1.0))
if self.v2:
sends.append(dict(version=2.0))
calls = []
for send_kwargs in sends:
for topic in self.topics:
if hasattr(self, 'retry'):
send_kwargs['retry'] = self.retry
else:
send_kwargs['retry'] = None
target = oslo_messaging.Target(topic='%s.%s' % (topic,
self.priority))
calls.append(mock.call(target,
self.ctxt,
message,
**send_kwargs))
method = getattr(notifier, self.priority)
method(self.ctxt, 'test.notify', self.payload)
uuid.uuid4.assert_called_once_with()
transport._send_notification.assert_has_calls(calls, any_order=True)
开发者ID:ozamiatin,项目名称:oslo.messaging,代码行数:68,代码来源:test_notifier.py
注:本文中的six.moves.mock.call函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论