本文整理汇总了Python中quantum.agent.linux.utils.get_interface_mac函数的典型用法代码示例。如果您正苦于以下问题:Python get_interface_mac函数的具体用法?Python get_interface_mac怎么用?Python get_interface_mac使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_interface_mac函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_rpc
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
mac = utils.get_interface_mac(physical_interfaces[0])
else:
devices = ip_lib.IPWrapper(self.root_helper).get_devices(True)
if devices:
mac = utils.get_interface_mac(devices[0].name)
else:
LOG.error(_("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
exit(1)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:35,代码来源:linuxbridge_quantum_agent.py
示例2: setUp
def setUp(self):
self.mox = mox.Mox()
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'
self.INT_OFPORT = 11111
self.TUN_OFPORT = 22222
self.mox.StubOutClassWithMocks(ovs_lib, 'OVSBridge')
self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, 'sudo')
self.mock_int_bridge.delete_port('patch-tun')
self.mock_int_bridge.remove_all_flows()
self.mock_int_bridge.add_flow(priority=1, actions='normal')
self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, 'sudo')
self.mock_tun_bridge.reset_bridge()
self.mock_int_bridge.add_patch_port(
'patch-tun', 'patch-int').AndReturn(self.TUN_OFPORT)
self.mock_tun_bridge.add_patch_port(
'patch-int', 'patch-tun').AndReturn(self.INT_OFPORT)
self.mock_tun_bridge.remove_all_flows()
self.mock_tun_bridge.add_flow(priority=1, actions='drop')
self.mox.StubOutWithMock(utils, 'get_interface_mac')
utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
开发者ID:FreescaleSemiconductor,项目名称:quantum,代码行数:25,代码来源:test_ovs_tunnel.py
示例3: setUp
def setUp(self):
cfg.CONF.set_override("rpc_backend", "quantum.openstack.common.rpc.impl_fake")
cfg.CONF.set_override("report_interval", 0, "AGENT")
self.mox = mox.Mox()
self.INT_BRIDGE = "integration_bridge"
self.TUN_BRIDGE = "tunnel_bridge"
self.MAP_TUN_BRIDGE = "tunnel_bridge_mapping"
self.NET_MAPPING = {"net1": self.MAP_TUN_BRIDGE}
self.INT_OFPORT = 11111
self.TUN_OFPORT = 22222
self.MAP_TUN_OFPORT = 33333
self.inta = self.mox.CreateMock(ip_lib.IPDevice)
self.intb = self.mox.CreateMock(ip_lib.IPDevice)
self.inta.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
self.intb.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
self.mox.StubOutClassWithMocks(ovs_lib, "OVSBridge")
self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, "sudo")
self.mock_int_bridge.delete_port("patch-tun")
self.mock_int_bridge.remove_all_flows()
self.mock_int_bridge.add_flow(priority=1, actions="normal")
self.mock_map_tun_bridge = ovs_lib.OVSBridge(self.MAP_TUN_BRIDGE, "sudo")
self.mock_map_tun_bridge.remove_all_flows()
self.mock_map_tun_bridge.add_flow(priority=1, actions="normal")
self.mock_int_bridge.delete_port("int-tunnel_bridge_mapping")
self.mock_map_tun_bridge.delete_port("phy-tunnel_bridge_mapping")
self.mock_int_bridge.add_port(self.inta)
self.mock_map_tun_bridge.add_port(self.intb)
self.inta.link.set_up()
self.intb.link.set_up()
self.mock_int_bridge.add_flow(priority=2, in_port=None, actions="drop")
self.mock_map_tun_bridge.add_flow(priority=2, in_port=None, actions="drop")
self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, "sudo")
self.mock_tun_bridge.reset_bridge()
self.mock_int_bridge.add_patch_port("patch-tun", "patch-int").AndReturn(self.TUN_OFPORT)
self.mock_tun_bridge.add_patch_port("patch-int", "patch-tun").AndReturn(self.INT_OFPORT)
self.mock_tun_bridge.remove_all_flows()
self.mock_tun_bridge.add_flow(priority=1, actions="drop")
self.mox.StubOutWithMock(ip_lib, "device_exists")
ip_lib.device_exists("tunnel_bridge_mapping", "sudo").AndReturn(True)
ip_lib.device_exists("int-tunnel_bridge_mapping", "sudo").AndReturn(True)
self.mox.StubOutWithMock(ip_lib.IpLinkCommand, "delete")
ip_lib.IPDevice("int-tunnel_bridge_mapping").link.delete()
self.mox.StubOutClassWithMocks(ip_lib, "IPWrapper")
ip_lib.IPWrapper("sudo").add_veth("int-tunnel_bridge_mapping", "phy-tunnel_bridge_mapping").AndReturn(
[self.inta, self.intb]
)
self.mox.StubOutWithMock(utils, "get_interface_mac")
utils.get_interface_mac(self.INT_BRIDGE).AndReturn("000000000001")
开发者ID:rosstaylor,项目名称:quantum,代码行数:56,代码来源:test_ovs_tunnel.py
示例4: test_get_interface_mac
def test_get_interface_mac(self):
expect_val = '01:02:03:04:05:06'
with mock.patch('fcntl.ioctl') as ioctl:
ioctl.return_value = ''.join(['\x00' * 18,
'\x01\x02\x03\x04\x05\x06',
'\x00' * 232])
actual_val = utils.get_interface_mac('eth0')
self.assertEqual(actual_val, expect_val)
开发者ID:StackOps,项目名称:quantum,代码行数:8,代码来源:test_agent_linux_utils.py
示例5: get_interface_mac_list
def get_interface_mac_list(self):
net = {}
f = open("/proc/net/dev")
lines = f.readlines()
f.close()
for line in lines[2:]:
con = line.split()
name = con[0].rstrip(":")
net[name] = utils.get_interface_mac(name)
return net
开发者ID:bestsharp,项目名称:rgos_quantum_plugin,代码行数:11,代码来源:rgos_quantum_agent_ovs.py
示例6: setup_rpc
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = "%s%s" % ("ovs", (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext("quantum", "quantum", is_admin=False)
# Handle updates from service
self.callbacks = OVSRpcCallbacks(self.context, self.int_br, self.local_ip, self.tun_br)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [config.TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher, self.topic, consumers)
开发者ID:t-lin,项目名称:quantum,代码行数:14,代码来源:ovs_quantum_agent.py
示例7: setup_rpc
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
开发者ID:ntoll,项目名称:quantum,代码行数:17,代码来源:ovs_quantum_agent.py
示例8: setup_rpc
def setup_rpc(self, physical_interface):
mac = utils.get_interface_mac(physical_interface)
self.agent_id = "%s%s" % ("lb", (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext("quantum", "quantum", is_admin=False)
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context, self.linux_br)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE]]
self.connection = create_consumers(self.dispatcher, self.topic, consumers)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)
monitor.filter_by("net")
开发者ID:vbannai,项目名称:quantum,代码行数:17,代码来源:linuxbridge_quantum_agent.py
示例9: setup_rpc
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.LoopingCall(self._report_state)
heartbeat.start(interval=report_interval)
开发者ID:wdec,项目名称:quantum,代码行数:23,代码来源:ovs_quantum_agent.py
示例10: setUp
def setUp(self):
super(TunnelTest, self).setUp()
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
self.mox = mox.Mox()
self.addCleanup(self.mox.UnsetStubs)
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'
self.MAP_TUN_BRIDGE = 'tunnel_bridge_mapping'
self.NET_MAPPING = {'net1': self.MAP_TUN_BRIDGE}
self.INT_OFPORT = 11111
self.TUN_OFPORT = 22222
self.MAP_TUN_OFPORT = 33333
self.inta = self.mox.CreateMock(ip_lib.IPDevice)
self.intb = self.mox.CreateMock(ip_lib.IPDevice)
self.inta.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
self.intb.link = self.mox.CreateMock(ip_lib.IpLinkCommand)
self.mox.StubOutClassWithMocks(ovs_lib, 'OVSBridge')
self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, 'sudo')
self.mock_int_bridge.delete_port('patch-tun')
self.mock_int_bridge.remove_all_flows()
self.mock_int_bridge.add_flow(priority=1, actions='normal')
self.mock_map_tun_bridge = ovs_lib.OVSBridge(
self.MAP_TUN_BRIDGE, 'sudo')
self.mock_map_tun_bridge.remove_all_flows()
self.mock_map_tun_bridge.add_flow(priority=1, actions='normal')
self.mock_int_bridge.delete_port('int-tunnel_bridge_mapping')
self.mock_map_tun_bridge.delete_port('phy-tunnel_bridge_mapping')
self.mock_int_bridge.add_port(self.inta)
self.mock_map_tun_bridge.add_port(self.intb)
self.inta.link.set_up()
self.intb.link.set_up()
self.mock_int_bridge.add_flow(priority=2, in_port=None, actions='drop')
self.mock_map_tun_bridge.add_flow(
priority=2, in_port=None, actions='drop')
self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, 'sudo')
self.mock_tun_bridge.reset_bridge()
self.mock_int_bridge.add_patch_port(
'patch-tun', 'patch-int').AndReturn(self.TUN_OFPORT)
self.mock_tun_bridge.add_patch_port(
'patch-int', 'patch-tun').AndReturn(self.INT_OFPORT)
self.mock_tun_bridge.remove_all_flows()
self.mock_tun_bridge.add_flow(priority=1, actions='drop')
self.mox.StubOutWithMock(ip_lib, 'device_exists')
ip_lib.device_exists('tunnel_bridge_mapping', 'sudo').AndReturn(True)
ip_lib.device_exists(
'int-tunnel_bridge_mapping', 'sudo').AndReturn(True)
self.mox.StubOutWithMock(ip_lib.IpLinkCommand, 'delete')
ip_lib.IPDevice('int-tunnel_bridge_mapping').link.delete()
self.mox.StubOutClassWithMocks(ip_lib, 'IPWrapper')
ip_lib.IPWrapper('sudo').add_veth(
'int-tunnel_bridge_mapping',
'phy-tunnel_bridge_mapping').AndReturn([self.inta, self.intb])
self.mox.StubOutWithMock(utils, 'get_interface_mac')
utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:65,代码来源:test_ovs_tunnel.py
示例11: test_get_interface_mac
def test_get_interface_mac(self):
expect_val = "01:02:03:04:05:06"
with mock.patch("fcntl.ioctl") as ioctl:
ioctl.return_value = "".join(["\x00" * 18, "\x01\x02\x03\x04\x05\x06", "\x00" * 232])
actual_val = utils.get_interface_mac("eth0")
self.assertEqual(actual_val, expect_val)
开发者ID:mygoda,项目名称:openstack,代码行数:6,代码来源:test_agent_linux_utils.py
注:本文中的quantum.agent.linux.utils.get_interface_mac函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论