• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.get_interface_mac函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quantum.agent.linux.utils.get_interface_mac函数的典型用法代码示例。如果您正苦于以下问题:Python get_interface_mac函数的具体用法?Python get_interface_mac怎么用?Python get_interface_mac使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_interface_mac函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setup_rpc

    def setup_rpc(self, physical_interfaces):
        if physical_interfaces:
            mac = utils.get_interface_mac(physical_interfaces[0])
        else:
            devices = ip_lib.IPWrapper(self.root_helper).get_devices(True)
            if devices:
                mac = utils.get_interface_mac(devices[0].name)
            else:
                LOG.error(_("Unable to obtain MAC address for unique ID. "
                          "Agent terminated!"))
                exit(1)
        self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
        LOG.info(_("RPC agent_id: %s"), self.agent_id)

        self.topic = topics.AGENT
        self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.callbacks = LinuxBridgeRpcCallbacks(self.context,
                                                 self)
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE]]
        self.connection = agent_rpc.create_consumers(self.dispatcher,
                                                     self.topic,
                                                     consumers)
        report_interval = cfg.CONF.AGENT.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:35,代码来源:linuxbridge_quantum_agent.py


示例2: setUp

    def setUp(self):
        self.mox = mox.Mox()

        self.INT_BRIDGE = 'integration_bridge'
        self.TUN_BRIDGE = 'tunnel_bridge'
        self.INT_OFPORT = 11111
        self.TUN_OFPORT = 22222

        self.mox.StubOutClassWithMocks(ovs_lib, 'OVSBridge')
        self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, 'sudo')
        self.mock_int_bridge.delete_port('patch-tun')
        self.mock_int_bridge.remove_all_flows()
        self.mock_int_bridge.add_flow(priority=1, actions='normal')

        self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, 'sudo')
        self.mock_tun_bridge.reset_bridge()
        self.mock_int_bridge.add_patch_port(
            'patch-tun', 'patch-int').AndReturn(self.TUN_OFPORT)
        self.mock_tun_bridge.add_patch_port(
            'patch-int', 'patch-tun').AndReturn(self.INT_OFPORT)
        self.mock_tun_bridge.remove_all_flows()
        self.mock_tun_bridge.add_flow(priority=1, actions='drop')

        self.mox.StubOutWithMock(utils, 'get_interface_mac')
        utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
开发者ID:FreescaleSemiconductor,项目名称:quantum,代码行数:25,代码来源:test_ovs_tunnel.py


示例3: setUp

    def setUp(self):
        cfg.CONF.set_override("rpc_backend", "quantum.openstack.common.rpc.impl_fake")
        cfg.CONF.set_override("report_interval", 0, "AGENT")
        self.mox = mox.Mox()
        self.INT_BRIDGE = "integration_bridge"
        self.TUN_BRIDGE = "tunnel_bridge"
        self.MAP_TUN_BRIDGE = "tunnel_bridge_mapping"
        self.NET_MAPPING = {"net1": self.MAP_TUN_BRIDGE}
        self.INT_OFPORT = 11111
        self.TUN_OFPORT = 22222
        self.MAP_TUN_OFPORT = 33333
        self.inta = self.mox.CreateMock(ip_lib.IPDevice)
        self.intb = self.mox.CreateMock(ip_lib.IPDevice)
        self.inta.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
        self.intb.link = self.mox.CreateMock(ip_lib.IpLinkCommand)

        self.mox.StubOutClassWithMocks(ovs_lib, "OVSBridge")
        self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, "sudo")
        self.mock_int_bridge.delete_port("patch-tun")
        self.mock_int_bridge.remove_all_flows()
        self.mock_int_bridge.add_flow(priority=1, actions="normal")

        self.mock_map_tun_bridge = ovs_lib.OVSBridge(self.MAP_TUN_BRIDGE, "sudo")
        self.mock_map_tun_bridge.remove_all_flows()
        self.mock_map_tun_bridge.add_flow(priority=1, actions="normal")
        self.mock_int_bridge.delete_port("int-tunnel_bridge_mapping")
        self.mock_map_tun_bridge.delete_port("phy-tunnel_bridge_mapping")
        self.mock_int_bridge.add_port(self.inta)
        self.mock_map_tun_bridge.add_port(self.intb)
        self.inta.link.set_up()
        self.intb.link.set_up()

        self.mock_int_bridge.add_flow(priority=2, in_port=None, actions="drop")
        self.mock_map_tun_bridge.add_flow(priority=2, in_port=None, actions="drop")

        self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, "sudo")
        self.mock_tun_bridge.reset_bridge()
        self.mock_int_bridge.add_patch_port("patch-tun", "patch-int").AndReturn(self.TUN_OFPORT)
        self.mock_tun_bridge.add_patch_port("patch-int", "patch-tun").AndReturn(self.INT_OFPORT)
        self.mock_tun_bridge.remove_all_flows()
        self.mock_tun_bridge.add_flow(priority=1, actions="drop")

        self.mox.StubOutWithMock(ip_lib, "device_exists")
        ip_lib.device_exists("tunnel_bridge_mapping", "sudo").AndReturn(True)
        ip_lib.device_exists("int-tunnel_bridge_mapping", "sudo").AndReturn(True)

        self.mox.StubOutWithMock(ip_lib.IpLinkCommand, "delete")
        ip_lib.IPDevice("int-tunnel_bridge_mapping").link.delete()

        self.mox.StubOutClassWithMocks(ip_lib, "IPWrapper")
        ip_lib.IPWrapper("sudo").add_veth("int-tunnel_bridge_mapping", "phy-tunnel_bridge_mapping").AndReturn(
            [self.inta, self.intb]
        )

        self.mox.StubOutWithMock(utils, "get_interface_mac")
        utils.get_interface_mac(self.INT_BRIDGE).AndReturn("000000000001")
开发者ID:rosstaylor,项目名称:quantum,代码行数:56,代码来源:test_ovs_tunnel.py


示例4: test_get_interface_mac

 def test_get_interface_mac(self):
     expect_val = '01:02:03:04:05:06'
     with mock.patch('fcntl.ioctl') as ioctl:
         ioctl.return_value = ''.join(['\x00' * 18,
                                       '\x01\x02\x03\x04\x05\x06',
                                       '\x00' * 232])
         actual_val = utils.get_interface_mac('eth0')
     self.assertEqual(actual_val, expect_val)
开发者ID:StackOps,项目名称:quantum,代码行数:8,代码来源:test_agent_linux_utils.py


示例5: get_interface_mac_list

 def get_interface_mac_list(self):
     net = {}
     f = open("/proc/net/dev")
     lines = f.readlines()
     f.close()
     for line in lines[2:]:
         con = line.split()
         name = con[0].rstrip(":")
         net[name] = utils.get_interface_mac(name)
     
     return net
开发者ID:bestsharp,项目名称:rgos_quantum_plugin,代码行数:11,代码来源:rgos_quantum_agent_ovs.py


示例6: setup_rpc

    def setup_rpc(self, integ_br):
        mac = utils.get_interface_mac(integ_br)
        self.agent_id = "%s%s" % ("ovs", (mac.replace(":", "")))
        self.topic = topics.AGENT
        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)

        # RPC network init
        self.context = context.RequestContext("quantum", "quantum", is_admin=False)
        # Handle updates from service
        self.callbacks = OVSRpcCallbacks(self.context, self.int_br, self.local_ip, self.tun_br)
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [config.TUNNEL, topics.UPDATE]]
        self.connection = agent_rpc.create_consumers(self.dispatcher, self.topic, consumers)
开发者ID:t-lin,项目名称:quantum,代码行数:14,代码来源:ovs_quantum_agent.py


示例7: setup_rpc

    def setup_rpc(self, integ_br):
        mac = utils.get_interface_mac(integ_br)
        self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
        self.topic = topics.AGENT
        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)

        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.dispatcher = self.create_rpc_dispatcher()
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE]]
        self.connection = agent_rpc.create_consumers(self.dispatcher,
                                                     self.topic,
                                                     consumers)
开发者ID:ntoll,项目名称:quantum,代码行数:17,代码来源:ovs_quantum_agent.py


示例8: setup_rpc

    def setup_rpc(self, physical_interface):
        mac = utils.get_interface_mac(physical_interface)
        self.agent_id = "%s%s" % ("lb", (mac.replace(":", "")))
        self.topic = topics.AGENT
        self.plugin_rpc = PluginApi(topics.PLUGIN)

        # RPC network init
        self.context = context.RequestContext("quantum", "quantum", is_admin=False)
        # Handle updates from service
        self.callbacks = LinuxBridgeRpcCallbacks(self.context, self.linux_br)
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE]]
        self.connection = create_consumers(self.dispatcher, self.topic, consumers)
        self.udev = pyudev.Context()
        monitor = pyudev.Monitor.from_netlink(self.udev)
        monitor.filter_by("net")
开发者ID:vbannai,项目名称:quantum,代码行数:17,代码来源:linuxbridge_quantum_agent.py


示例9: setup_rpc

    def setup_rpc(self, integ_br):
        mac = utils.get_interface_mac(integ_br)
        self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.dispatcher = self.create_rpc_dispatcher()
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [topics.SECURITY_GROUP, topics.UPDATE]]
        self.connection = agent_rpc.create_consumers(self.dispatcher,
                                                     self.topic,
                                                     consumers)
        report_interval = cfg.CONF.AGENT.report_interval
        if report_interval:
            heartbeat = loopingcall.LoopingCall(self._report_state)
            heartbeat.start(interval=report_interval)
开发者ID:wdec,项目名称:quantum,代码行数:23,代码来源:ovs_quantum_agent.py


示例10: setUp

    def setUp(self):
        super(TunnelTest, self).setUp()
        cfg.CONF.set_override('rpc_backend',
                              'quantum.openstack.common.rpc.impl_fake')
        cfg.CONF.set_override('report_interval', 0, 'AGENT')
        self.mox = mox.Mox()
        self.addCleanup(self.mox.UnsetStubs)

        self.INT_BRIDGE = 'integration_bridge'
        self.TUN_BRIDGE = 'tunnel_bridge'
        self.MAP_TUN_BRIDGE = 'tunnel_bridge_mapping'
        self.NET_MAPPING = {'net1': self.MAP_TUN_BRIDGE}
        self.INT_OFPORT = 11111
        self.TUN_OFPORT = 22222
        self.MAP_TUN_OFPORT = 33333
        self.inta = self.mox.CreateMock(ip_lib.IPDevice)
        self.intb = self.mox.CreateMock(ip_lib.IPDevice)
        self.inta.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
        self.intb.link = self.mox.CreateMock(ip_lib.IpLinkCommand)

        self.mox.StubOutClassWithMocks(ovs_lib, 'OVSBridge')
        self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, 'sudo')
        self.mock_int_bridge.delete_port('patch-tun')
        self.mock_int_bridge.remove_all_flows()
        self.mock_int_bridge.add_flow(priority=1, actions='normal')

        self.mock_map_tun_bridge = ovs_lib.OVSBridge(
            self.MAP_TUN_BRIDGE, 'sudo')
        self.mock_map_tun_bridge.remove_all_flows()
        self.mock_map_tun_bridge.add_flow(priority=1, actions='normal')
        self.mock_int_bridge.delete_port('int-tunnel_bridge_mapping')
        self.mock_map_tun_bridge.delete_port('phy-tunnel_bridge_mapping')
        self.mock_int_bridge.add_port(self.inta)
        self.mock_map_tun_bridge.add_port(self.intb)
        self.inta.link.set_up()
        self.intb.link.set_up()

        self.mock_int_bridge.add_flow(priority=2, in_port=None, actions='drop')
        self.mock_map_tun_bridge.add_flow(
            priority=2, in_port=None, actions='drop')

        self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, 'sudo')
        self.mock_tun_bridge.reset_bridge()
        self.mock_int_bridge.add_patch_port(
            'patch-tun', 'patch-int').AndReturn(self.TUN_OFPORT)
        self.mock_tun_bridge.add_patch_port(
            'patch-int', 'patch-tun').AndReturn(self.INT_OFPORT)
        self.mock_tun_bridge.remove_all_flows()
        self.mock_tun_bridge.add_flow(priority=1, actions='drop')

        self.mox.StubOutWithMock(ip_lib, 'device_exists')
        ip_lib.device_exists('tunnel_bridge_mapping', 'sudo').AndReturn(True)
        ip_lib.device_exists(
            'int-tunnel_bridge_mapping', 'sudo').AndReturn(True)

        self.mox.StubOutWithMock(ip_lib.IpLinkCommand, 'delete')
        ip_lib.IPDevice('int-tunnel_bridge_mapping').link.delete()

        self.mox.StubOutClassWithMocks(ip_lib, 'IPWrapper')
        ip_lib.IPWrapper('sudo').add_veth(
            'int-tunnel_bridge_mapping',
            'phy-tunnel_bridge_mapping').AndReturn([self.inta, self.intb])

        self.mox.StubOutWithMock(utils, 'get_interface_mac')
        utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:65,代码来源:test_ovs_tunnel.py


示例11: test_get_interface_mac

 def test_get_interface_mac(self):
     expect_val = "01:02:03:04:05:06"
     with mock.patch("fcntl.ioctl") as ioctl:
         ioctl.return_value = "".join(["\x00" * 18, "\x01\x02\x03\x04\x05\x06", "\x00" * 232])
         actual_val = utils.get_interface_mac("eth0")
     self.assertEqual(actual_val, expect_val)
开发者ID:mygoda,项目名称:openstack,代码行数:6,代码来源:test_agent_linux_utils.py



注:本文中的quantum.agent.linux.utils.get_interface_mac函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rpc.create_consumers函数代码示例发布时间:2022-05-26
下一篇:
Python utils.execute函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap