本文整理汇总了Python中pyroute2.IPRoute类的典型用法代码示例。如果您正苦于以下问题:Python IPRoute类的具体用法?Python IPRoute怎么用?Python IPRoute使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了IPRoute类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: set_dns_systemd_resolved
def set_dns_systemd_resolved(lease):
# NOTE: if systemd-resolved is not already running, we might not want to
# run it in case there's specific system configuration for other resolvers
ipr = IPRoute()
index = ipr.link_lookup(ifname=lease.interface)[0]
# Construct the argument to pass to DBUS.
# the equivalent argument for:
# busctl call org.freedesktop.resolve1 /org/freedesktop/resolve1 \
# org.freedesktop.resolve1.Manager SetLinkDNS 'ia(iay)' 2 1 2 4 1 2 3 4
# is SetLinkDNS(2, [(2, [8, 8, 8, 8])]_
iay = [(2, [int(b) for b in ns.split('.')])
for ns in lease.name_server.split()]
# if '.' in ns
# else (10, [ord(x) for x in
# socket.inet_pton(socket.AF_INET6, ns)])
bus = SystemBus()
resolved = bus.get_object('org.freedesktop.resolve1',
'/org/freedesktop/resolve1')
manager = Interface(resolved,
dbus_interface='org.freedesktop.resolve1.Manager')
try:
manager.SetLinkDNS(index, iay)
return True
except DBusException as e:
logger.error(e)
return False
开发者ID:juga0,项目名称:dhcpcanon,代码行数:26,代码来源:netutils.py
示例2: start_ue
def start_ue () :
#print 'Enter your commands below.\r\nInsert "exit" to leave the application.'
timeout=60 #timeout in seconds
send_command('AT', 'OK' , timeout)
send_command('AT+CFUN=1' , 'OK' , timeout)
#send_command('AT+CGATT=0' , 'OK' , timeout)
send_command('AT+CGATT=1','OK', 300)
#os.system('wvdial -C ' + bandrich_ppd_config + ' &' )
thread_ppp = pppThread(1, "ppp_thread", 1)
thread_ppp.start()
iface='ppp0'
while 1:
time.sleep ( 2)
#Now we check if ppp0 interface is up and running
try:
if exit_flag == 1:
break
ip = IPRoute()
idx = ip.link_lookup(ifname=iface)[0]
os.system ('route add 192.172.0.1 ppp0')
os.system ('ping -c 5 192.172.0.1')
break
except Exception, e:
error = ' Interface ' + iface + 'does not exist...'
error = error + ' In function: ' + sys._getframe().f_code.co_name + ': *** Caught exception: ' + str(e.__class__) + " : " + str( e)
error = error + traceback.format_exc()
print error
开发者ID:debashish216,项目名称:lte-testbed-news,代码行数:30,代码来源:configure_cots_bandrich_ue.py
示例3: scan_netdevs
def scan_netdevs():
scan = []
ipr = IPRoute()
try:
for part in ipr.get_links():
new_link = {}
new_link["netlink_msg"] = part
new_link["index"] = part["index"]
new_link["name"] = part.get_attr("IFLA_IFNAME")
hwaddr = part.get_attr("IFLA_ADDRESS")
if hwaddr:
new_link["hwaddr"] = normalize_hwaddr(hwaddr)
else:
new_link["hwaddr"] = None
addrs = ipr.get_addr(index=new_link["index"])
new_link["ip_addrs"] = addrs
scan.append(new_link)
except:
raise
finally:
ipr.close()
return scan
开发者ID:idosch,项目名称:lnst,代码行数:26,代码来源:NetUtils.py
示例4: create_veth_pair
def create_veth_pair(name):
ip = IPRoute()
peers = ('{}0'.format(name), '{}1'.format(name))
LOG.info('creating veth pair {}'.format(peers))
ip.link('add', kind='veth', ifname=peers[0], peer=peers[1])
link_up(peers[0])
link_up(peers[1])
开发者ID:durgesh-rane,项目名称:simple-ostinato,代码行数:7,代码来源:utils.py
示例5: set_net
def set_net(lease):
ipr = IPRoute()
try:
index = ipr.link_lookup(ifname=lease.interface)[0]
except IndexError as e:
logger.error('Interface %s not found, can not set IP.',
lease.interface)
try:
ipr.addr('add', index, address=lease.address,
mask=int(lease.subnet_mask_cidr))
except NetlinkError as e:
if ipr.get_addr(index=index)[0].\
get_attrs('IFA_ADDRESS')[0] == lease.address:
logger.debug('Interface %s is already set to IP %s' %
(lease.interface, lease.address))
else:
logger.error(e)
else:
logger.debug('Interface %s set to IP %s' %
(lease.interface, lease.address))
try:
ipr.route('add', dst='0.0.0.0', gateway=lease.router, oif=index)
except NetlinkError as e:
if ipr.get_routes(table=254)[0].\
get_attrs('RTA_GATEWAY')[0] == lease.router:
logger.debug('Default gateway is already set to %s' %
(lease.router))
else:
logger.error(e)
else:
logger.debug('Default gateway set to %s', lease.router)
ipr.close()
set_dns(lease)
开发者ID:juga0,项目名称:dhcpcanon,代码行数:33,代码来源:netutils.py
示例6: create_interface
def create_interface(self):
'''
Create veth-pair
Creates veth-pair in the host-os first and then one end of the pair is
moved inside container.
Name of interface inside container can be same for multiple containers.
So, the veth-pair cannot be created with name of interface inside
cotnainer. The veth-pair is created with a temporary name and will be
renamed later
'''
# Check if interface already created
iproute = IPRoute()
iface = iproute.link_lookup(ifname=self.host_ifname)
if len(iface) != 0:
return
# Create veth pairs. One end of pair is named host_ifname and the
# other end of pair is set a temporary name. It will be overridden
# once interface is moved inside container
try:
cn_name = self.host_ifname + '-ns'
iproute.link_create(ifname=self.host_ifname, peer=cn_name,
kind='veth')
except NetlinkError as e:
if e.code != errno.EEXIST:
raise CniError(CNI_ERROR_ADD_VETH,
'Error creating veth device ' +
self.host_ifname + ' code ' + str(e.code) +
' message ' + e.message)
# Move one end of pair inside container
self.move_link(cn_name)
return
开发者ID:Juniper,项目名称:contrail-controller,代码行数:34,代码来源:cni.py
示例7: TestProxyData
class TestProxyData(TestData):
def setup(self):
create_link('dummyX', 'dummy')
t_url = 'unix://\0%s' % (uuid.uuid4())
p_url = 'unix://\0%s' % (uuid.uuid4())
self.uplink = IPRoute()
self.uplink.serve(t_url)
self.proxy = IPRoute(host=t_url)
self.proxy.serve(p_url)
self.ip = IPRoute(host=p_url)
service = self.ip.discover(self.ip.default_target,
addr=self.proxy.default_peer)
self.ip.default_peer = self.proxy.default_peer
self.ip.default_dport = service
self.dev = self.ip.link_lookup(ifname='dummyX')
def teardown(self):
TestData.teardown(self)
self.proxy.release()
self.uplink.release()
开发者ID:hegusung,项目名称:pyroute2,代码行数:26,代码来源:test_ipr.py
示例8: rv_manager
class rv_manager(object):
def __init__(self):
self.ipr = IPRoute()
self.dataplane = BPF(src_file="core/rv_manager/rv_manager.c")
# Loading Tables from dp
self.next = self.dataplane.get_table("next_hop")
self.ifc2vi = self.dataplane.get_table("rvm_ifc2vi")
self.vi2ifc = self.dataplane.get_table("rvm_vi2ifc")
# Loading Functions from db
self.func_phy2virt = self.dataplane.load_func(
"rvm_function_p2v", BPF.SCHED_CLS)
self.func_virt2phy = self.dataplane.load_func(
"rvm_function_v2p", BPF.SCHED_CLS)
def set_next_hop(self, next_vnf):
self.next[self.next.Key(0)] = self.next.Leaf(next_vnf)
def get_fd(self):
return self.func_virt2phy.fd
def set_bpf_ingress(self, ifc_index, func):
self.ipr.tc("add", "ingress", ifc_index, "ffff:")
self.ipr.tc("add-filter", "bpf", ifc_index, ":1", fd=func.fd,
name=func.name, parent="ffff:", action="drop", classid=1)
def add_new_workload(self, phy_iface_index, virt_iface_index):
self.ifc2vi[self.ifc2vi.Key(phy_iface_index)] = self.ifc2vi.Leaf(
virt_iface_index)
self.vi2ifc[self.vi2ifc.Key(virt_iface_index)] = self.vi2ifc.Leaf(
phy_iface_index)
self.set_bpf_ingress(phy_iface_index, self.func_phy2virt)
开发者ID:mbertrone,项目名称:ebpf_turtle,代码行数:33,代码来源:helper_rvm.py
示例9: scan_netdevs
def scan_netdevs():
scan = []
ipr = IPRoute()
try:
for part in ipr.get_links():
new_link = {}
new_link["netlink_msg"] = part
new_link["index"] = part["index"]
new_link["name"] = part.get_attr("IFLA_IFNAME")
#
# FIXME:
#
# nlmsg.get_attr() returns None if there is no
# such attribute in the NLA chain; if hwaddr is None,
# normalize_hwaddr(hwaddr) will raise AttributeError(),
# since None has no upper(). The issue is that the
# AttributeError() will be a bit unrelated to the
# root cause, and since that it will be confusing.
#
hwaddr = part.get_attr("IFLA_ADDRESS")
new_link["hwaddr"] = normalize_hwaddr(hwaddr)
scan.append(new_link)
except:
raise
finally:
ipr.close()
return scan
开发者ID:aloughlam,项目名称:lnst,代码行数:28,代码来源:NetUtils.py
示例10: delete_interface
def delete_interface(self):
'''
Delete the interface.
Deletes both VLAN Tag interface and MACVlan interface
'''
# Find the VLAN interface interface from the MACVlan interface
link = self.get_link()
if link is None:
return
vlan_idx = None
for i in link[0]['attrs']:
if (i[0] == 'IFLA_LINK'):
vlan_idx = i[1]
break
if vlan_idx is None:
raise Error(CNI_ERROR_DEL_VLAN_INTF,
'Error finding vlan interface. Interface inside ' +
' container ' + self.cni.container_ifname)
# Delete the VLAN Tag interface.
# It will delete the interface inside container also
try:
iproute = IPRoute()
iproute.link('del', index=vlan_idx)
except NetlinkError as e:
raise Error(CNI_ERROR_DEL_VLAN_INTF,
'Error deleting VLAN interface. Parent interface ' +
self.host_ifname + ' vlan-tag ' + self.vlan_tag +
' vlan-ifindex ' + str(vlan_idx) +
' code ' + str(e.code) + ' message ' + e.message)
return
开发者ID:Juniper,项目名称:contrail-controller,代码行数:33,代码来源:macvlan.py
示例11: delete_veth_pair
def delete_veth_pair(name):
ip = IPRoute()
peers = ('{}0'.format(name), '{}1'.format(name))
LOG.info('deleting veth pair {}'.format(peers))
link_down(peers[0])
link_down(peers[1])
ip.link('del', index=ip.link_lookup(ifname=peers[0])[0])
开发者ID:durgesh-rane,项目名称:simple-ostinato,代码行数:7,代码来源:utils.py
示例12: test_iproute
def test_iproute(self):
ip = IPRoute()
try:
assert len(ip.get_links()) > 1
except:
raise
finally:
ip.close()
开发者ID:0xD3ADB33F,项目名称:pyroute2,代码行数:8,代码来源:test_eventlet.py
示例13: mroute
def mroute():
ip = IPRoute() # family = NETLINK_ROUTE "Receives routing and link
# updates and may be used to modify the routing tables"
ip.monitor()
while True:
for raw in ip.get():
print '{event:10} {attrs}'.format(**raw)
开发者ID:shavenwarthog,项目名称:johntellsall,代码行数:8,代码来源:umon.py
示例14: _run_remote_uplink
def _run_remote_uplink(url, connect, release,
key=None, cert=None, ca=None):
ip = IPRoute()
ip.serve(url, key=key, cert=cert, ca=ca)
ip.iothread.secret = 'bala'
connect.set()
release.wait()
ip.release()
开发者ID:chantra,项目名称:pyroute2,代码行数:8,代码来源:test_ipr.py
示例15: setup
def setup(self):
create_link('dummyX', 'dummy')
url = 'unix://\0%s' % (uuid.uuid4())
self.uplink = IPRoute()
self.uplink.serve(url)
self.ip = IPRoute(host=url)
self.dev = self.ip.link_lookup(ifname='dummyX')
开发者ID:hegusung,项目名称:pyroute2,代码行数:9,代码来源:test_ipr.py
示例16: testServer
def testServer(self):
url = 'unix://\0%s' % (uuid.uuid4())
ip = IPRoute()
ip.serve(url)
target = Process(target=_run_remote_client,
args=(url, 'get_links'))
target.start()
target.join()
ip.release()
开发者ID:chantra,项目名称:pyroute2,代码行数:9,代码来源:test_ipr.py
示例17: getInterfaceState
def getInterfaceState(ifname):
try:
ip = IPRoute()
state = ip.get_links(ip.link_lookup(ifname=ifname))[0].get_attr('IFLA_OPERSTATE')
ip.close()
except Exception as e:
raise Exception("getInterfaceState: Collecting interface status for %s failed: %s" % (ifname,str(e)))
else:
if state == "UP":
return True
return False
开发者ID:piconsole,项目名称:picon,代码行数:11,代码来源:utils.py
示例18: _test_remote
def _test_remote(self, url):
connect = Event()
release = Event()
target = Process(target=_run_remote_uplink,
args=(url, connect, release))
target.daemon = True
target.start()
connect.wait()
ip = IPRoute(host=url)
ip.release()
release.set()
开发者ID:chantra,项目名称:pyroute2,代码行数:11,代码来源:test_ipr.py
示例19: get_link
def get_link(self):
'''
Get link information for the interface inside the container
'''
link = None
with CniNamespace(self.container_netns, logger):
iproute = IPRoute()
iface = iproute.link_lookup(ifname=self.container_ifname)
if len(iface) != 0:
idx = iface[0]
link = iproute.link("get", index=idx)
return link
开发者ID:Juniper,项目名称:contrail-controller,代码行数:12,代码来源:cni.py
示例20: TestMisc
class TestMisc(object):
def setup(self):
self.ip = IPRoute()
def teardown(self):
self.ip.release()
def test_addrpool_expand(self):
# see coverage
for i in range(100):
self.ip.get_addr()
开发者ID:chantra,项目名称:pyroute2,代码行数:12,代码来源:test_ipr.py
注:本文中的pyroute2.IPRoute类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论