• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python messaging._BroadcastMessage函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nova.cells.messaging._BroadcastMessage函数的典型用法代码示例。如果您正苦于以下问题:Python _BroadcastMessage函数的具体用法?Python _BroadcastMessage怎么用?Python _BroadcastMessage使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了_BroadcastMessage函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rpc_topic_uses_message_type

    def test_rpc_topic_uses_message_type(self):
        self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
        msg_runner = fakes.get_message_runner('api-cell')
        cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
        message = messaging._BroadcastMessage(msg_runner,
                self.ctxt, 'fake', {}, 'down', fanout=True)
        message.message_type = 'fake-message-type'

        expected_server_params = {'hostname': 'rpc_host2',
                                  'password': 'password2',
                                  'port': 3092,
                                  'username': 'username2',
                                  'virtual_host': 'rpc_vhost2'}
        expected_url = ('rabbit://%(username)s:%(password)[email protected]'
                        '%(hostname)s:%(port)d/%(virtual_host)s' %
                        expected_server_params)

        def check_transport_url(cell_state):
            return cell_state.db_info['transport_url'] == expected_url

        rpcapi = self.driver.intercell_rpcapi
        rpcclient = self.mox.CreateMockAnything()

        self.mox.StubOutWithMock(rpcapi, '_get_client')
        rpcapi._get_client(
            mox.Func(check_transport_url),
            'cells.intercell42.fake-message-type').AndReturn(rpcclient)

        rpcclient.prepare(fanout=True).AndReturn(rpcclient)
        rpcclient.cast(mox.IgnoreArg(), 'process_message',
                       message=message.to_json())

        self.mox.ReplayAll()

        self.driver.send_message_to_cell(cell_state, message)
开发者ID:andymcc,项目名称:nova,代码行数:35,代码来源:test_cells_rpc_driver.py


示例2: test_rpc_topic_uses_message_type

    def test_rpc_topic_uses_message_type(self):
        self.flags(rpc_driver_queue_base="cells.intercell42", group="cells")
        msg_runner = fakes.get_message_runner("api-cell")
        cell_state = fakes.get_cell_state("api-cell", "child-cell2")
        message = messaging._BroadcastMessage(msg_runner, self.ctxt, "fake", {}, "down", fanout=True)
        message.message_type = "fake-message-type"

        expected_server_params = {
            "hostname": "rpc_host2",
            "password": "password2",
            "port": 3092,
            "username": "username2",
            "virtual_host": "rpc_vhost2",
        }
        expected_url = (
            "rabbit://%(username)s:%(password)[email protected]" "%(hostname)s:%(port)d/%(virtual_host)s" % expected_server_params
        )

        def check_transport_url(cell_state):
            return cell_state.db_info["transport_url"] == expected_url

        rpcapi = self.driver.intercell_rpcapi
        rpcclient = self.mox.CreateMockAnything()

        self.mox.StubOutWithMock(rpcapi, "_get_client")
        rpcapi._get_client(mox.Func(check_transport_url), "cells.intercell42.fake-message-type").AndReturn(rpcclient)

        rpcclient.prepare(fanout=True).AndReturn(rpcclient)
        rpcclient.cast(mox.IgnoreArg(), "process_message", message=message.to_json())

        self.mox.ReplayAll()

        self.driver.send_message_to_cell(cell_state, message)
开发者ID:wenlongwljs,项目名称:nova,代码行数:33,代码来源:test_cells_rpc_driver.py


示例3: test_broadcast_routing_with_response_max_hops

    def test_broadcast_routing_with_response_max_hops(self):
        self.flags(max_hop_count=2, group='cells')
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'down'

        def our_fake_method(message, **kwargs):
            return 'response-%s' % message.routing_path

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)

        bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                                    self.ctxt, method,
                                                    method_kwargs,
                                                    direction,
                                                    run_locally=True,
                                                    need_response=True)
        responses = bcast_message.process()
        # Should only get responses from our immediate children (and
        # ourselves)
        self.assertEqual(len(responses), 5)
        for response in responses:
            self.assertFalse(response.failure)
            self.assertEqual('response-%s' % response.cell_name,
                    response.value_or_raise())
开发者ID:dscannell,项目名称:nova,代码行数:25,代码来源:test_cells_messaging.py


示例4: test_rpc_topic_uses_message_type

    def test_rpc_topic_uses_message_type(self):
        self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
        msg_runner = fakes.get_message_runner('api-cell')
        cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
        message = messaging._BroadcastMessage(msg_runner,
                self.ctxt, 'fake', {}, 'down', fanout=True)
        message.message_type = 'fake-message-type'

        expected_server_params = {'hostname': 'rpc_host2',
                                  'password': 'password2',
                                  'port': 3092,
                                  'username': 'username2',
                                  'virtual_host': 'rpc_vhost2'}
        expected_url = ('rabbit://%(username)s:%(password)[email protected]'
                        '%(hostname)s:%(port)d/%(virtual_host)s' %
                        expected_server_params)

        rpcapi = self.driver.intercell_rpcapi
        rpcclient = mock.Mock()

        with mock.patch.object(rpcapi, '_get_client') as m_get_client:
            m_get_client.return_value = rpcclient

            rpcclient.prepare(fanout=True)
            rpcclient.prepare.return_value = rpcclient

            self.driver.send_message_to_cell(cell_state, message)
            m_get_client.assert_called_with(cell_state,
                    'cells.intercell42.fake-message-type')
            self.assertEqual(expected_url,
                             cell_state.db_info['transport_url'])
            rpcclient.prepare.assert_called_with(fanout=True)
            rpcclient.cast.assert_called_with(mock.ANY,
                                              'process_message',
                                              message=message.to_json())
开发者ID:Juniper,项目名称:nova,代码行数:35,代码来源:test_cells_rpc_driver.py


示例5: test_rpc_topic_uses_message_type

    def test_rpc_topic_uses_message_type(self):
        self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
        msg_runner = fakes.get_message_runner('api-cell')
        cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
        message = messaging._BroadcastMessage(msg_runner,
                self.ctxt, 'fake', 'fake', 'down', fanout=True)
        message.message_type = 'fake-message-type'

        call_info = {}

        def _fake_fanout_cast_to_server(*args, **kwargs):
            call_info['topic'] = kwargs.get('topic')

        self.stubs.Set(self.driver.intercell_rpcapi,
                       'fanout_cast_to_server', _fake_fanout_cast_to_server)

        self.driver.send_message_to_cell(cell_state, message)
        self.assertEqual('cells.intercell42.fake-message-type',
                         call_info['topic'])
开发者ID:DSpeichert,项目名称:nova,代码行数:19,代码来源:test_cells_rpc_driver.py


示例6: test_broadcast_routing

    def test_broadcast_routing(self):
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'down'

        cells = set()

        def our_fake_method(message, **kwargs):
            cells.add(message.routing_path)

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)

        bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                                    self.ctxt, method,
                                                    method_kwargs,
                                                    direction,
                                                    run_locally=True)
        bcast_message.process()
        # fakes creates 8 cells (including ourself).
        self.assertEqual(len(cells), 8)
开发者ID:dscannell,项目名称:nova,代码行数:20,代码来源:test_cells_messaging.py


示例7: test_broadcast_routing_with_all_erroring

    def test_broadcast_routing_with_all_erroring(self):
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'down'

        def our_fake_method(message, **kwargs):
            raise test.TestingException('fake failure')

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)

        bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                                    self.ctxt, method,
                                                    method_kwargs,
                                                    direction,
                                                    run_locally=True,
                                                    need_response=True)
        responses = bcast_message.process()
        self.assertEqual(len(responses), 8)
        for response in responses:
            self.assertTrue(response.failure)
            self.assertRaises(test.TestingException, response.value_or_raise)
开发者ID:dscannell,项目名称:nova,代码行数:21,代码来源:test_cells_messaging.py


示例8: test_process_message

    def test_process_message(self):
        msg_runner = fakes.get_message_runner("api-cell")
        dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
        message = messaging._BroadcastMessage(msg_runner, self.ctxt, "fake", {}, "down", fanout=True)

        call_info = {}

        def _fake_message_from_json(json_message):
            call_info["json_message"] = json_message
            self.assertEqual(message.to_json(), json_message)
            return message

        def _fake_process():
            call_info["process_called"] = True

        self.stubs.Set(msg_runner, "message_from_json", _fake_message_from_json)
        self.stubs.Set(message, "process", _fake_process)

        dispatcher.process_message(self.ctxt, message.to_json())
        self.assertEqual(message.to_json(), call_info["json_message"])
        self.assertTrue(call_info["process_called"])
开发者ID:wenlongwljs,项目名称:nova,代码行数:21,代码来源:test_cells_rpc_driver.py


示例9: test_process_message

    def test_process_message(self):
        msg_runner = fakes.get_message_runner('api-cell')
        dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
        message = messaging._BroadcastMessage(msg_runner,
                self.ctxt, 'fake', {}, 'down', fanout=True)

        call_info = {}

        def _fake_message_from_json(json_message):
            call_info['json_message'] = json_message
            self.assertEqual(message.to_json(), json_message)
            return message

        def _fake_process():
            call_info['process_called'] = True

        msg_runner.message_from_json = _fake_message_from_json
        message.process = _fake_process
        dispatcher.process_message(self.ctxt, message.to_json())
        self.assertEqual(message.to_json(), call_info['json_message'])
        self.assertTrue(call_info['process_called'])
开发者ID:Juniper,项目名称:nova,代码行数:21,代码来源:test_cells_rpc_driver.py


示例10: test_broadcast_routing_with_two_erroring

    def test_broadcast_routing_with_two_erroring(self):
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'down'

        def our_fake_method_failing(message, **kwargs):
            raise test.TestingException('fake failure')

        def our_fake_method(message, **kwargs):
            return 'response-%s' % message.routing_path

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
        fakes.stub_bcast_method(self, 'child-cell2', 'our_fake_method',
                                our_fake_method_failing)
        fakes.stub_bcast_method(self, 'grandchild-cell3', 'our_fake_method',
                                our_fake_method_failing)

        bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                                    self.ctxt, method,
                                                    method_kwargs,
                                                    direction,
                                                    run_locally=True,
                                                    need_response=True)
        responses = bcast_message.process()
        self.assertEqual(len(responses), 8)
        failure_responses = [resp for resp in responses if resp.failure]
        success_responses = [resp for resp in responses if not resp.failure]
        self.assertEqual(len(failure_responses), 2)
        self.assertEqual(len(success_responses), 6)

        for response in success_responses:
            self.assertFalse(response.failure)
            self.assertEqual('response-%s' % response.cell_name,
                    response.value_or_raise())

        for response in failure_responses:
            self.assertIn(response.cell_name, ['api-cell!child-cell2',
                    'api-cell!child-cell3!grandchild-cell3'])
            self.assertTrue(response.failure)
            self.assertRaises(test.TestingException, response.value_or_raise)
开发者ID:dscannell,项目名称:nova,代码行数:40,代码来源:test_cells_messaging.py


示例11: test_broadcast_routing_with_response

    def test_broadcast_routing_with_response(self):
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'down'

        def our_fake_method(message, **kwargs):
            return 'response-%s' % message.routing_path

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)

        bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                                    self.ctxt, method,
                                                    method_kwargs,
                                                    direction,
                                                    run_locally=True,
                                                    need_response=True)
        responses = bcast_message.process()
        self.assertEqual(len(responses), 8)
        for response in responses:
            self.assertFalse(response.failure)
            self.assertEqual('response-%s' % response.cell_name,
                    response.value_or_raise())
开发者ID:dscannell,项目名称:nova,代码行数:22,代码来源:test_cells_messaging.py


示例12: test_broadcast_routing_up

    def test_broadcast_routing_up(self):
        method = 'our_fake_method'
        method_kwargs = dict(arg1=1, arg2=2)
        direction = 'up'
        msg_runner = fakes.get_message_runner('grandchild-cell3')

        cells = set()

        def our_fake_method(message, **kwargs):
            cells.add(message.routing_path)

        fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)

        bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
                                                    method, method_kwargs,
                                                    direction,
                                                    run_locally=True)
        bcast_message.process()
        # Paths are reversed, since going 'up'
        expected = set(['grandchild-cell3', 'grandchild-cell3!child-cell3',
                        'grandchild-cell3!child-cell3!api-cell'])
        self.assertEqual(expected, cells)
开发者ID:dscannell,项目名称:nova,代码行数:22,代码来源:test_cells_messaging.py


示例13: test_create_broadcast_message_with_response

 def test_create_broadcast_message_with_response(self):
     self.flags(max_hop_count=99, group='cells')
     our_name = 'child-cell1'
     msg_runner = fakes.get_message_runner(our_name)
     method = 'fake_method'
     method_kwargs = dict(arg1=1, arg2=2)
     direction = 'up'
     bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
             method, method_kwargs, direction, need_response=True)
     self.assertEqual(self.ctxt, bcast_message.ctxt)
     self.assertEqual(method, bcast_message.method_name)
     self.assertEqual(method_kwargs, bcast_message.method_kwargs)
     self.assertEqual(direction, bcast_message.direction)
     self.assertFalse(bcast_message.fanout)
     self.assertTrue(bcast_message.need_response)
     self.assertEqual(our_name, bcast_message.routing_path)
     self.assertEqual(1, bcast_message.hop_count)
     self.assertEqual(99, bcast_message.max_hop_count)
     self.assertTrue(bcast_message.is_broadcast)
     # Correct next hops?
     next_hops = bcast_message._get_next_hops()
     parent_cells = msg_runner.state_manager.get_parent_cells()
     self.assertEqual(parent_cells, next_hops)
开发者ID:dscannell,项目名称:nova,代码行数:23,代码来源:test_cells_messaging.py


示例14: test_create_broadcast_message

 def test_create_broadcast_message(self):
     self.flags(max_hop_count=99, group='cells')
     self.flags(name='api-cell', max_hop_count=99, group='cells')
     method = 'fake_method'
     method_kwargs = dict(arg1=1, arg2=2)
     direction = 'down'
     bcast_message = messaging._BroadcastMessage(self.msg_runner,
                                               self.ctxt, method,
                                               method_kwargs, direction)
     self.assertEqual(self.ctxt, bcast_message.ctxt)
     self.assertEqual(method, bcast_message.method_name)
     self.assertEqual(method_kwargs, bcast_message.method_kwargs)
     self.assertEqual(direction, bcast_message.direction)
     self.assertFalse(bcast_message.fanout)
     self.assertFalse(bcast_message.need_response)
     self.assertEqual(self.our_name, bcast_message.routing_path)
     self.assertEqual(1, bcast_message.hop_count)
     self.assertEqual(99, bcast_message.max_hop_count)
     self.assertTrue(bcast_message.is_broadcast)
     # Correct next hops?
     next_hops = bcast_message._get_next_hops()
     child_cells = self.state_manager.get_child_cells()
     self.assertEqual(child_cells, next_hops)
开发者ID:dscannell,项目名称:nova,代码行数:23,代码来源:test_cells_messaging.py



注:本文中的nova.cells.messaging._BroadcastMessage函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python messaging._TargetedMessage函数代码示例发布时间:2022-05-27
下一篇:
Python block_device.volume_in_mapping函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap