• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python topics.get_topic_name函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quantum.common.topics.get_topic_name函数的典型用法代码示例。如果您正苦于以下问题:Python get_topic_name函数的具体用法?Python get_topic_name怎么用?Python get_topic_name使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_topic_name函数的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

 def __init__(self, topic):
     super(AgentNotifierApi, self).__init__(
         topic=topic, default_version=self.BASE_RPC_API_VERSION)
     self.topic_network_delete = topics.get_topic_name(topic,
                                                       topics.NETWORK,
                                                       topics.DELETE)
     self.topic_port_update = topics.get_topic_name(topic,
                                                    topics.PORT,
                                                    topics.UPDATE)
开发者ID:alexpilotti,项目名称:quantum,代码行数:9,代码来源:lb_quantum_plugin.py


示例2: test_port_update

 def test_port_update(self):
     rpcapi = povs.AgentNotifierApi(topics.AGENT)
     self._test_ovs_api(rpcapi,
                        topics.get_topic_name(topics.AGENT,
                                              topics.PORT,
                                              topics.UPDATE),
                        'port_update', rpc_method='fanout_cast',
                        port='fake_port', vlan_id='fake_vlan_id')
开发者ID:missall,项目名称:quantum,代码行数:8,代码来源:test_rpcapi.py


示例3: test_tunnel_update

 def test_tunnel_update(self):
     rpcapi = povs.AgentNotifierApi(topics.AGENT)
     self._test_ovs_api(rpcapi,
                        topics.get_topic_name(topics.AGENT,
                                              constants.TUNNEL,
                                              topics.UPDATE),
                        'tunnel_update', rpc_method='fanout_cast',
                        tunnel_ip='fake_ip', tunnel_id='fake_id')
开发者ID:AmirolAhmad,项目名称:quantum,代码行数:8,代码来源:test_ovs_rpcapi.py


示例4: test_port_delete

 def test_port_delete(self):
     rpcapi = ana.AgentNotifierApi(topics.AGENT)
     self._test_hyperv_quantum_api(
         rpcapi,
         topics.get_topic_name(topics.AGENT, topics.PORT, topics.DELETE),
         "port_delete",
         rpc_method="fanout_cast",
         port_id="port_id",
     )
开发者ID:alexpilotti,项目名称:quantum,代码行数:9,代码来源:test_hyperv_rpcapi.py


示例5: test_delete_network

 def test_delete_network(self):
     rpcapi = ana.AgentNotifierApi(topics.AGENT)
     self._test_hyperv_quantum_api(
         rpcapi,
         topics.get_topic_name(topics.AGENT, topics.NETWORK, topics.DELETE),
         "network_delete",
         rpc_method="fanout_cast",
         network_id="fake_request_spec",
     )
开发者ID:alexpilotti,项目名称:quantum,代码行数:9,代码来源:test_hyperv_rpcapi.py


示例6: test_tunnel_update

 def test_tunnel_update(self):
     rpcapi = ana.AgentNotifierApi(topics.AGENT)
     self._test_hyperv_quantum_api(
         rpcapi,
         topics.get_topic_name(topics.AGENT, constants.TUNNEL, topics.UPDATE),
         "tunnel_update",
         rpc_method="fanout_cast",
         tunnel_ip="fake_ip",
         tunnel_id="fake_id",
     )
开发者ID:alexpilotti,项目名称:quantum,代码行数:10,代码来源:test_hyperv_rpcapi.py


示例7: test_port_update

 def test_port_update(self):
     rpcapi = plb.AgentNotifierApi(topics.AGENT)
     self._test_lb_api(
         rpcapi,
         topics.get_topic_name(topics.AGENT, topics.PORT, topics.UPDATE),
         "port_update",
         rpc_method="fanout_cast",
         port="fake_port",
         vlan_id="fake_vlan_id",
     )
开发者ID:t-lin,项目名称:quantum,代码行数:10,代码来源:test_rpcapi.py


示例8: test_port_update

 def test_port_update(self):
     rpcapi = povs.AgentNotifierApi(topics.AGENT)
     self._test_ovs_api(rpcapi,
                        topics.get_topic_name(topics.AGENT,
                                              topics.PORT,
                                              topics.UPDATE),
                        'port_update', rpc_method='fanout_cast',
                        port='fake_port',
                        network_type='fake_network_type',
                        segmentation_id='fake_segmentation_id',
                        physical_network='fake_physical_network')
开发者ID:AmirolAhmad,项目名称:quantum,代码行数:11,代码来源:test_ovs_rpcapi.py


示例9: test_port_update

 def test_port_update(self):
     rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
     self._test_mlnx_api(rpcapi,
                         topics.get_topic_name(topics.AGENT,
                                               topics.PORT,
                                               topics.UPDATE),
                         'port_update', rpc_method='fanout_cast',
                         port='fake_port',
                         network_type='vlan',
                         physical_network='fake_net',
                         vlan_id='fake_vlan_id')
开发者ID:Apsu,项目名称:quantum,代码行数:11,代码来源:test_rpcapi.py


示例10: test_port_update

 def test_port_update(self):
     rpcapi = ana.AgentNotifierApi(topics.AGENT)
     self._test_hyperv_quantum_api(
         rpcapi,
         topics.get_topic_name(topics.AGENT, topics.PORT, topics.UPDATE),
         "port_update",
         rpc_method="fanout_cast",
         port="fake_port",
         network_type="fake_network_type",
         segmentation_id="fake_segmentation_id",
         physical_network="fake_physical_network",
     )
开发者ID:alexpilotti,项目名称:quantum,代码行数:12,代码来源:test_hyperv_rpcapi.py


示例11: create_consumers

def create_consumers(dispatcher, prefix, topic_details):
    """Create agent RPC consumers.

    :param dispatcher: The dispatcher to process the incoming messages.
    :param prefix: Common prefix for the plugin/agent message queues.
    :param topic_details: A list of topics. Each topic has a name and a
                          operation.

    :returns: A common Connection.
    """

    connection = rpc.create_connection(new=True)
    for topic, operation in topic_details:
        topic_name = topics.get_topic_name(prefix, topic, operation)
        connection.create_consumer(topic_name, dispatcher, fanout=True)
    connection.consume_in_thread()
    return connection
开发者ID:Apsu,项目名称:quantum,代码行数:17,代码来源:rpc.py


示例12: __init__

 def __init__(self, topic):
     super(NECPluginV2AgentNotifierApi, self).__init__(
         topic=topic, default_version=self.BASE_RPC_API_VERSION)
     self.topic_port_update = topics.get_topic_name(
         topic, topics.PORT, topics.UPDATE)
开发者ID:AmirolAhmad,项目名称:quantum,代码行数:5,代码来源:nec_plugin.py


示例13: _get_security_group_topic

 def _get_security_group_topic(self):
     return topics.get_topic_name(self.topic,
                                  topics.SECURITY_GROUP,
                                  topics.UPDATE)
开发者ID:AmirolAhmad,项目名称:quantum,代码行数:4,代码来源:securitygroups_rpc.py



注:本文中的quantum.common.topics.get_topic_name函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.str_uuid函数代码示例发布时间:2022-05-26
下一篇:
Python config.setup_logging函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap