• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python mock.call函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.mock.call函数的典型用法代码示例。如果您正苦于以下问题:Python call函数的具体用法?Python call怎么用?Python call使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了call函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_two_topics

    def test_two_topics(self):
        transport = oslo_messaging.get_transport(self.conf, url="fake:")

        endpoint = mock.Mock()
        endpoint.info.return_value = None
        targets = [oslo_messaging.Target(topic="topic1"), oslo_messaging.Target(topic="topic2")]
        listener_thread = self._setup_listener(transport, [endpoint], targets=targets)
        notifier = self._setup_notifier(transport, topic="topic1")
        notifier.info({"ctxt": "1"}, "an_event.start1", "test")
        notifier = self._setup_notifier(transport, topic="topic2")
        notifier.info({"ctxt": "2"}, "an_event.start2", "test")

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        endpoint.info.assert_has_calls(
            [
                mock.call(
                    {"ctxt": "1"},
                    "testpublisher",
                    "an_event.start1",
                    "test",
                    {"timestamp": mock.ANY, "message_id": mock.ANY},
                ),
                mock.call(
                    {"ctxt": "2"},
                    "testpublisher",
                    "an_event.start2",
                    "test",
                    {"timestamp": mock.ANY, "message_id": mock.ANY},
                ),
            ],
            any_order=True,
        )
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:34,代码来源:test_listener.py


示例2: test_listen_and_direct_send

    def test_listen_and_direct_send(self):
        target = oslo_messaging.Target(exchange="exchange_test",
                                       topic="topic_test",
                                       server="server_test")

        with mock.patch('qpid.messaging.Connection') as conn_cls:
            conn = conn_cls.return_value
            session = conn.session.return_value
            session.receiver.side_effect = [mock.Mock(), mock.Mock(),
                                            mock.Mock()]

            listener = self.driver.listen(target)
            listener.conn.direct_send("msg_id", {})

        self.assertEqual(3, len(listener.conn.consumers))

        expected_calls = [
            mock.call(AddressNodeMatcher(
                'amq.topic/topic/exchange_test/topic_test')),
            mock.call(AddressNodeMatcher(
                'amq.topic/topic/exchange_test/topic_test.server_test')),
            mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
        ]
        session.receiver.assert_has_calls(expected_calls)
        session.sender.assert_called_with(
            AddressNodeMatcher("amq.direct/msg_id"))
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:26,代码来源:test_impl_qpid.py


示例3: test_batch_timeout

    def test_batch_timeout(self):
        transport = oslo_messaging.get_transport(self.conf, url='fake:')

        endpoint = mock.Mock()
        endpoint.info.return_value = None
        listener_thread = self._setup_listener(transport, [endpoint],
                                               batch=(5, 1))

        notifier = self._setup_notifier(transport)
        for i in six.moves.range(12):
            notifier.info({}, 'an_event.start', 'test message')

        self.wait_for_messages(3)
        self.assertFalse(listener_thread.stop())

        messages = [dict(ctxt={},
                         publisher_id='testpublisher',
                         event_type='an_event.start',
                         payload='test message',
                         metadata={'message_id': mock.ANY,
                                   'timestamp': mock.ANY})]

        endpoint.info.assert_has_calls([mock.call(messages * 5),
                                        mock.call(messages * 5),
                                        mock.call(messages * 2)])
开发者ID:shahar-stratoscale,项目名称:oslo.messaging,代码行数:25,代码来源:test_listener.py


示例4: test_two_topics

    def test_two_topics(self):
        transport = oslo_messaging.get_transport(self.conf, url='fake:')

        endpoint = mock.Mock()
        endpoint.info.return_value = None
        targets = [oslo_messaging.Target(topic="topic1"),
                   oslo_messaging.Target(topic="topic2")]
        listener_thread = self._setup_listener(transport, [endpoint],
                                               targets=targets)
        notifier = self._setup_notifier(transport, topic='topic1')
        notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
        notifier = self._setup_notifier(transport, topic='topic2')
        notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        endpoint.info.assert_has_calls([
            mock.call({'ctxt': '1'}, 'testpublisher',
                      'an_event.start1', 'test',
                      {'timestamp': mock.ANY, 'message_id': mock.ANY}),
            mock.call({'ctxt': '2'}, 'testpublisher',
                      'an_event.start2', 'test',
                      {'timestamp': mock.ANY, 'message_id': mock.ANY})],
            any_order=True)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:25,代码来源:test_listener.py


示例5: test_requeue

    def test_requeue(self):
        transport = oslo_messaging.get_transport(self.conf, url="fake:")
        endpoint = mock.Mock()
        endpoint.info = mock.Mock()

        def side_effect_requeue(*args, **kwargs):
            if endpoint.info.call_count == 1:
                return oslo_messaging.NotificationResult.REQUEUE
            return oslo_messaging.NotificationResult.HANDLED

        endpoint.info.side_effect = side_effect_requeue
        listener_thread = self._setup_listener(transport, [endpoint])
        notifier = self._setup_notifier(transport)
        notifier.info({}, "an_event.start", "test")

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        endpoint.info.assert_has_calls(
            [
                mock.call(
                    {}, "testpublisher", "an_event.start", "test", {"timestamp": mock.ANY, "message_id": mock.ANY}
                ),
                mock.call(
                    {}, "testpublisher", "an_event.start", "test", {"timestamp": mock.ANY, "message_id": mock.ANY}
                ),
            ]
        )
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:28,代码来源:test_listener.py


示例6: test_handle_message_callback

def test_handle_message_callback(transport, protocol, dispatcher):
    server = RPCServer(transport, protocol, dispatcher)
    server.trace = Mock(return_value=None)
    server.receive_one_message()

    assert server.trace.call_args_list == [call('-->', CONTEXT, RECMSG), call('<--', CONTEXT, SERMSG)]
    server.trace.assert_called()
    
开发者ID:litnimax,项目名称:tinyrpc,代码行数:7,代码来源:test_server.py


示例7: test_two_exchanges

    def test_two_exchanges(self):
        transport = oslo_messaging.get_transport(self.conf, url="fake:")

        endpoint = mock.Mock()
        endpoint.info.return_value = None
        targets = [
            oslo_messaging.Target(topic="topic", exchange="exchange1"),
            oslo_messaging.Target(topic="topic", exchange="exchange2"),
        ]
        listener_thread = self._setup_listener(transport, [endpoint], targets=targets)

        notifier = self._setup_notifier(transport, topic="topic")

        def mock_notifier_exchange(name):
            def side_effect(target, ctxt, message, version, retry):
                target.exchange = name
                return transport._driver.send_notification(target, ctxt, message, version, retry=retry)

            transport._send_notification = mock.MagicMock(side_effect=side_effect)

        notifier.info({"ctxt": "0"}, "an_event.start", "test message default exchange")
        mock_notifier_exchange("exchange1")
        notifier.info({"ctxt": "1"}, "an_event.start", "test message exchange1")
        mock_notifier_exchange("exchange2")
        notifier.info({"ctxt": "2"}, "an_event.start", "test message exchange2")

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        endpoint.info.assert_has_calls(
            [
                mock.call(
                    {"ctxt": "1"},
                    "testpublisher",
                    "an_event.start",
                    "test message exchange1",
                    {"timestamp": mock.ANY, "message_id": mock.ANY},
                ),
                mock.call(
                    {"ctxt": "2"},
                    "testpublisher",
                    "an_event.start",
                    "test message exchange2",
                    {"timestamp": mock.ANY, "message_id": mock.ANY},
                ),
            ],
            any_order=True,
        )
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:48,代码来源:test_listener.py


示例8: test_constructor

    def test_constructor(self, warn):
        transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
        target = oslo_messaging.Target(topic='foo', server='bar')
        endpoints = [object()]
        serializer = object()
        access_policy = dispatcher.DefaultRPCAccessPolicy

        warnings.simplefilter("always", FutureWarning)
        server = oslo_messaging.get_rpc_server(transport,
                                               target,
                                               endpoints,
                                               serializer=serializer,
                                               access_policy=access_policy)
        self.assertIs(server.conf, self.conf)
        self.assertIs(server.transport, transport)
        self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
        self.assertIs(server.dispatcher.endpoints, endpoints)
        self.assertIs(server.dispatcher.serializer, serializer)
        self.assertEqual('blocking', server.executor_type)
        self.assertEqual([
            mock.call("blocking executor is deprecated. Executor default will "
                      "be removed. Use explicitly threading or eventlet "
                      "instead in version 'pike' and will be removed in "
                      "version 'rocky'",
                      category=FutureWarning, stacklevel=3)
        ], warn.mock_calls)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:26,代码来源:test_server.py


示例9: _do_test_heartbeat_sent

    def _do_test_heartbeat_sent(self, fake_ensure_connection,
                                fake_heartbeat_support, fake_heartbeat,
                                fake_logger, heartbeat_side_effect=None,
                                info=None):

        event = threading.Event()

        def heartbeat_check(rate=2):
            event.set()
            if heartbeat_side_effect:
                raise heartbeat_side_effect

        fake_heartbeat.side_effect = heartbeat_check

        transport = oslo_messaging.get_transport(self.conf,
                                                 'kombu+memory:////')
        self.addCleanup(transport.cleanup)
        conn = transport._driver._get_connection()
        conn.ensure(method=lambda: True)
        event.wait()
        conn._heartbeat_stop()

        # check heartbeat have been called
        self.assertLess(0, fake_heartbeat.call_count)

        if not heartbeat_side_effect:
            self.assertEqual(1, fake_ensure_connection.call_count)
            self.assertEqual(2, fake_logger.debug.call_count)
            self.assertEqual(0, fake_logger.info.call_count)
        else:
            self.assertEqual(2, fake_ensure_connection.call_count)
            self.assertEqual(2, fake_logger.debug.call_count)
            self.assertEqual(1, fake_logger.info.call_count)
            self.assertIn(mock.call(info, mock.ANY),
                          fake_logger.info.mock_calls)
开发者ID:vito0302,项目名称:oslo.messaging,代码行数:35,代码来源:test_impl_rabbit.py


示例10: mocked_endpoint_call

 def mocked_endpoint_call(i):
     return mock.call(
         {"ctxt": "%d" % i},
         "testpublisher",
         "an_event.start",
         "test message%d" % i,
         {"timestamp": mock.ANY, "message_id": mock.ANY},
     )
开发者ID:wangyanbn,项目名称:oslo.messaging,代码行数:8,代码来源:test_listener.py


示例11: test_two_exchanges

    def test_two_exchanges(self):
        transport = msg_notifier.get_notification_transport(
            self.conf, url='fake:')

        endpoint = mock.Mock()
        endpoint.info.return_value = None
        targets = [oslo_messaging.Target(topic="topic",
                                         exchange="exchange1"),
                   oslo_messaging.Target(topic="topic",
                                         exchange="exchange2")]
        listener_thread = self._setup_listener(transport, [endpoint],
                                               targets=targets)

        notifier = self._setup_notifier(transport, topic="topic")

        def mock_notifier_exchange(name):
            def side_effect(target, ctxt, message, version, retry):
                target.exchange = name
                return transport._driver.send_notification(target, ctxt,
                                                           message, version,
                                                           retry=retry)
            transport._send_notification = mock.MagicMock(
                side_effect=side_effect)

        notifier.info({'ctxt': '0'},
                      'an_event.start', 'test message default exchange')
        mock_notifier_exchange('exchange1')
        notifier.info({'ctxt': '1'},
                      'an_event.start', 'test message exchange1')
        mock_notifier_exchange('exchange2')
        notifier.info({'ctxt': '2'},
                      'an_event.start', 'test message exchange2')

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        endpoint.info.assert_has_calls([
            mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
                      'test message exchange1',
                      {'timestamp': mock.ANY, 'message_id': mock.ANY}),
            mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
                      'test message exchange2',
                      {'timestamp': mock.ANY, 'message_id': mock.ANY})],
            any_order=True)
开发者ID:apporc,项目名称:oslo.messaging,代码行数:44,代码来源:test_listener.py


示例12: test_reconnect_order

    def test_reconnect_order(self):
        brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
        brokers_count = len(brokers)

        self.config(qpid_hosts=brokers,
                    group='oslo_messaging_qpid')

        with mock.patch('qpid.messaging.Connection') as conn_mock:
            # starting from the first broker in the list
            url = oslo_messaging.TransportURL.parse(self.conf, None)
            connection = qpid_driver.Connection(self.conf, url,
                                                amqp.PURPOSE_SEND)

            # reconnect will advance to the next broker, one broker per
            # attempt, and then wrap to the start of the list once the end is
            # reached
            for _ in range(brokers_count):
                connection.reconnect()

        expected = []
        for broker in brokers:
            expected.extend([mock.call("%s:5672" % broker),
                             mock.call().open(),
                             mock.call().session(),
                             mock.call().opened(),
                             mock.call().opened().__nonzero__(),
                             mock.call().close()])

        conn_mock.assert_has_calls(expected, any_order=True)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:29,代码来源:test_impl_qpid.py


示例13: test_paths

    def test_paths(self, mocker):
        """Ensure that the specified paths are added to the path."""
        paths = ['path1', 'path2', 'path3']

        eval_mock = mocker.patch('matl_online.octave.OctaveSession.eval')

        session = OctaveSession(paths=paths)

        assert session.octaverc is None
        assert session.paths == paths

        expected_calls = [mock.call('addpath("''%s''")' % path) for path in paths]
        eval_mock.assert_has_calls(expected_calls)
开发者ID:suever,项目名称:MATL-Online,代码行数:13,代码来源:test_octave.py


示例14: test_call_serializer

    def test_call_serializer(self):
        self.config(rpc_response_timeout=None)

        transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
        serializer = msg_serializer.NoOpSerializer()

        client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
                                          serializer=serializer)

        transport._send = mock.Mock()

        kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
        kwargs['retry'] = None
        if self.call:
            kwargs['call_monitor_timeout'] = None

        transport._send.return_value = self.retval

        serializer.serialize_entity = mock.Mock()
        serializer.deserialize_entity = mock.Mock()
        serializer.serialize_context = mock.Mock()

        def _stub(ctxt, arg):
            return 's' + arg

        msg = dict(method='foo', args=dict())
        for k, v in self.args.items():
            msg['args'][k] = 's' + v
        serializer.serialize_entity.side_effect = _stub

        if self.call:
            serializer.deserialize_entity.return_value = 'd' + self.retval

        serializer.serialize_context.return_value = dict(user='alice')

        method = client.call if self.call else client.cast
        retval = method(self.ctxt, 'foo', **self.args)
        if self.retval is not None:
            self.assertEqual('d' + self.retval, retval)

        transport._send.assert_called_once_with(oslo_messaging.Target(),
                                                dict(user='alice'),
                                                msg,
                                                **kwargs)
        expected_calls = [mock.call(self.ctxt, arg) for arg in self.args]
        self.assertEqual(expected_calls,
                         serializer.serialize_entity.mock_calls)
        if self.call:
            serializer.deserialize_entity.assert_called_once_with(self.ctxt,
                                                                  self.retval)
        serializer.serialize_context.assert_called_once_with(self.ctxt)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:51,代码来源:test_client.py


示例15: test_constructor_without_explicit_RPCAccessPolicy

    def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
        transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
        target = oslo_messaging.Target(topic='foo', server='bar')
        endpoints = [object()]
        serializer = object()

        warnings.simplefilter("always", FutureWarning)
        oslo_messaging.get_rpc_server(transport, target,
                                      endpoints, serializer=serializer)
        self.assertEqual([
            mock.call("blocking executor is deprecated. Executor default will "
                      "be removed. Use explicitly threading or eventlet "
                      "instead in version 'pike' and will be removed in "
                      "version 'rocky'",
                      category=FutureWarning, stacklevel=3)
        ], warn.mock_calls)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:16,代码来源:test_server.py


示例16: test_get_transport

    def test_get_transport(self, fake_logger):
        self.config(rpc_backend=self.rpc_backend,
                    control_exchange=self.control_exchange,
                    transport_url=self.transport_url)

        driver.DriverManager = mock.Mock()

        invoke_args = [self.conf,
                       oslo_messaging.TransportURL.parse(self.conf,
                                                         self.expect['url'])]
        invoke_kwds = dict(default_exchange=self.expect['exchange'],
                           allowed_remote_exmods=self.expect['allowed'])

        drvr = _FakeDriver(self.conf)

        driver.DriverManager.return_value = _FakeManager(drvr)

        kwargs = dict(url=self.url)
        if self.allowed is not None:
            kwargs['allowed_remote_exmods'] = self.allowed
        if self.aliases is not None:
            kwargs['aliases'] = self.aliases
        transport_ = oslo_messaging.get_transport(self.conf, **kwargs)

        if self.aliases is not None:
            self.assertEqual(
                [mock.call('legacy "rpc_backend" is deprecated, '
                           '"testfoo" must be replaced by '
                           '"%s"' % self.aliases.get('testfoo'))],
                fake_logger.warning.mock_calls
            )

        self.assertIsNotNone(transport_)
        self.assertIs(transport_.conf, self.conf)
        self.assertIs(transport_._driver, drvr)
        self.assertTrue(isinstance(transport_, transport.RPCTransport))

        driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
                                                     self.expect['backend'],
                                                     invoke_on_load=True,
                                                     invoke_args=invoke_args,
                                                     invoke_kwds=invoke_kwds)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:42,代码来源:test_transport.py


示例17: test_serializer

    def test_serializer(self):
        endpoint = _FakeEndpoint()
        serializer = msg_serializer.NoOpSerializer()
        dispatcher = oslo_messaging.RPCDispatcher([endpoint], serializer)

        endpoint.foo = mock.Mock()

        args = dict([(k, 'd' + v) for k, v in self.args.items()])
        endpoint.foo.return_value = self.retval

        serializer.serialize_entity = mock.Mock()
        serializer.deserialize_entity = mock.Mock()
        serializer.deserialize_context = mock.Mock()

        serializer.deserialize_context.return_value = self.dctxt

        expected_side_effect = ['d' + arg for arg in self.args]
        serializer.deserialize_entity.side_effect = expected_side_effect

        serializer.serialize_entity.return_value = None
        if self.retval:
            serializer.serialize_entity.return_value = 's' + self.retval

        incoming = mock.Mock()
        incoming.ctxt = self.ctxt
        incoming.message = dict(method='foo', args=self.args)
        incoming.client_timeout = 0
        retval = dispatcher.dispatch(incoming)
        if self.retval is not None:
            self.assertEqual('s' + self.retval, retval)

        endpoint.foo.assert_called_once_with(self.dctxt, **args)
        serializer.deserialize_context.assert_called_once_with(self.ctxt)

        expected_calls = [mock.call(self.dctxt, arg) for arg in self.args]
        self.assertEqual(expected_calls,
                         serializer.deserialize_entity.mock_calls)

        serializer.serialize_entity.assert_called_once_with(self.dctxt,
                                                            self.retval)
开发者ID:openstack,项目名称:oslo.messaging,代码行数:40,代码来源:test_dispatcher.py


示例18: test_batch_size_exception_path

    def test_batch_size_exception_path(self):
        transport = oslo_messaging.get_transport(self.conf, url='fake:')

        endpoint = mock.Mock()
        endpoint.info.side_effect = [None, Exception('boom!')]
        listener_thread = self._setup_listener(transport, [endpoint],
                                               batch=(5, None))

        notifier = self._setup_notifier(transport)
        for i in six.moves.range(10):
            notifier.info({}, 'an_event.start', 'test message')

        self.wait_for_messages(2)
        self.assertFalse(listener_thread.stop())

        messages = [dict(ctxt={},
                         publisher_id='testpublisher',
                         event_type='an_event.start',
                         payload='test message',
                         metadata={'message_id': mock.ANY,
                                   'timestamp': mock.ANY})]

        endpoint.info.assert_has_calls([mock.call(messages * 5)])
开发者ID:shahar-stratoscale,项目名称:oslo.messaging,代码行数:23,代码来源:test_listener.py


示例19: mocked_endpoint_call

 def mocked_endpoint_call(i):
     return mock.call({'ctxt': '%d' % i}, 'testpublisher',
                      'an_event.start', 'test message%d' % i,
                      {'timestamp': mock.ANY, 'message_id': mock.ANY})
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:4,代码来源:test_listener.py


示例20: test_notifier

    def test_notifier(self, mock_utcnow):
        drivers = []
        if self.v1:
            drivers.append('messaging')
        if self.v2:
            drivers.append('messagingv2')

        self.config(driver=drivers,
                    topics=self.topics,
                    group='oslo_messaging_notifications')

        transport = _FakeTransport(self.conf)

        if hasattr(self, 'ctor_pub_id'):
            notifier = oslo_messaging.Notifier(transport,
                                               publisher_id=self.ctor_pub_id)
        else:
            notifier = oslo_messaging.Notifier(transport)

        prepare_kwds = {}
        if hasattr(self, 'retry'):
            prepare_kwds['retry'] = self.retry
        if hasattr(self, 'prep_pub_id'):
            prepare_kwds['publisher_id'] = self.prep_pub_id
        if prepare_kwds:
            notifier = notifier.prepare(**prepare_kwds)

        transport._send_notification = mock.Mock()

        message_id = uuid.uuid4()
        uuid.uuid4 = mock.Mock(return_value=message_id)

        mock_utcnow.return_value = datetime.datetime.utcnow()

        message = {
            'message_id': str(message_id),
            'publisher_id': self.expected_pub_id,
            'event_type': 'test.notify',
            'priority': self.priority.upper(),
            'payload': self.payload,
            'timestamp': str(timeutils.utcnow()),
        }

        sends = []
        if self.v1:
            sends.append(dict(version=1.0))
        if self.v2:
            sends.append(dict(version=2.0))

        calls = []
        for send_kwargs in sends:
            for topic in self.topics:
                if hasattr(self, 'retry'):
                    send_kwargs['retry'] = self.retry
                else:
                    send_kwargs['retry'] = None
                target = oslo_messaging.Target(topic='%s.%s' % (topic,
                                                                self.priority))
                calls.append(mock.call(target,
                                       self.ctxt,
                                       message,
                                       **send_kwargs))

        method = getattr(notifier, self.priority)
        method(self.ctxt, 'test.notify', self.payload)

        uuid.uuid4.assert_called_once_with()
        transport._send_notification.assert_has_calls(calls, any_order=True)
开发者ID:ozamiatin,项目名称:oslo.messaging,代码行数:68,代码来源:test_notifier.py



注:本文中的six.moves.mock.call函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python mock.patch函数代码示例发布时间:2022-05-27
下一篇:
Python http_cookies.SimpleCookie类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap