本文整理汇总了Python中ryu.lib.packet.icmpv6.icmpv6函数的典型用法代码示例。如果您正苦于以下问题:Python icmpv6函数的具体用法?Python icmpv6怎么用?Python icmpv6使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了icmpv6函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: control_plane_icmpv6_handler
def control_plane_icmpv6_handler(self, in_port, vlan, eth_src,
ipv6_pkt, icmpv6_pkt):
flowmods = []
pkt = self.build_ethernet_pkt(
eth_src, in_port, vlan, ether.ETH_TYPE_IPV6)
if icmpv6_pkt.type_ == icmpv6.ND_NEIGHBOR_SOLICIT:
dst = icmpv6_pkt.data.dst
ipv6_reply = ipv6.ipv6(
src=dst,
dst=ipv6_pkt.src,
nxt=inet.IPPROTO_ICMPV6,
hop_limit=ipv6_pkt.hop_limit)
pkt.add_protocol(ipv6_reply)
icmpv6_reply = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
dst=dst,
option=icmpv6.nd_option_tla(hw_src=self.FAUCET_MAC),
res=7))
pkt.add_protocol(icmpv6_reply)
pkt.serialize()
flowmods.extend([self.valve_packetout(in_port, pkt.data)])
elif icmpv6_pkt.type_ == icmpv6.ND_NEIGHBOR_ADVERT:
resolved_ip_gw = ipaddr.IPv6Address(icmpv6_pkt.data.dst)
self.logger.info('ND response %s for %s', eth_src, resolved_ip_gw)
is_updated = None
if resolved_ip_gw in vlan.nd_cache:
cached_eth_dst = vlan.nd_cache[resolved_ip_gw].eth_src
if cached_eth_dst != eth_src:
is_updated = True
else:
is_updated = False
for ip_dst, ip_gw in vlan.ipv6_routes.iteritems():
if ip_gw == resolved_ip_gw:
flowmods.extend(
self.add_resolved_route(
ether.ETH_TYPE_IPV6, vlan, vlan.nd_cache,
ip_gw, ip_dst, eth_src,is_updated))
elif icmpv6_pkt.type_ == icmpv6.ICMPV6_ECHO_REQUEST:
dst = ipv6_pkt.dst
ipv6_reply = ipv6.ipv6(
src=dst,
dst=ipv6_pkt.src,
nxt=inet.IPPROTO_ICMPV6,
hop_limit=ipv6_pkt.hop_limit)
pkt.add_protocol(ipv6_reply)
icmpv6_reply = icmpv6.icmpv6(
type_=icmpv6.ICMPV6_ECHO_REPLY,
data=icmpv6.echo(
id_=icmpv6_pkt.data.id,
seq=icmpv6_pkt.data.seq,
data=icmpv6_pkt.data.data))
pkt.add_protocol(icmpv6_reply)
pkt.serialize()
flowmods.extend([self.valve_packetout(in_port, pkt.data)])
return flowmods
开发者ID:onfsdn,项目名称:firewall,代码行数:58,代码来源:valve.py
示例2: test_default_args
def test_default_args(self):
la = icmpv6.nd_option_sla()
buf = la.serialize()
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
# with nd_neighbor
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
option=icmpv6.nd_option_tla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:24]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
res = struct.unpack(icmpv6.nd_option_tla._PACK_STR, str(buf[24:]))
eq_(res[0], icmpv6.ND_OPTION_TLA)
eq_(res[1], len(icmpv6.nd_option_tla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
# with nd_router_solicit
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_SOLICIT,
data=icmpv6.nd_router_solicit(
option=icmpv6.nd_option_sla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR, str(buf[4:8]))
eq_(res[0], 0)
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[8:]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
开发者ID:09beeihaq,项目名称:Coursera-SDN-Assignments,代码行数:57,代码来源:test_icmpv6.py
示例3: _handle_icmp
def _handle_icmp(self, datapath, port, pkt_ethernet, pkt_ipv, pkt_icmp):
# if pkt_icmp.type != icmp.ICMP_ECHO_REQUEST:
pkt = packet.Packet()
pkt.add_protocol(ethernet.ethernet(ethertype=pkt_ethernet.ethertype,
# dst=pkt_ethernet.src,
# src=self.hw_addr))
dst=pkt_ethernet.dst,
src=pkt_ethernet.src))
# pkt.add_protocol(ipv4.ipv4(dst=pkt_ipv4.src,
# src=self.ip_addr,
# proto=pkt_ipv4.proto))
""" IPv6 update
pkt.add_protocol(ipv4.ipv4(dst=pkt_ipv.src,
src=pkt_ethernet.src,
proto=pkt_ipv.proto))
pkt.add_protocol(icmp.icmp(type_=icmp.ICMP_ECHO_REPLY,
code=icmp.ICMP_ECHO_REPLY_CODE,
csum=0,
data=pkt_icmp.data))
"""
pkt.add_protocol(ipv6.ipv6(dst=pkt_ipv.dst,
src=pkt_ipv.src))
pkt.add_protocol(icmpv6.icmpv6(type_=icmpv6.ICMPV6_ECHO_REPLY,
csum=0,
data=pkt_icmp.data))
self._send_packet(datapath, port, pkt)
开发者ID:takeuchi17401,项目名称:performance_test,代码行数:28,代码来源:sendMLD.py
示例4: nd_advert
def nd_advert(vid, eth_src, eth_dst, src_ip, dst_ip):
"""Return IPv6 neighbor avertisement packet.
Args:
vid (int or None): VLAN VID to use (or None).
eth_src (str): source Ethernet MAC address.
eth_dst (str): destination Ethernet MAC address.
src_ip (ipaddress.IPv6Address): source IPv6 address.
dst_ip (ipaddress.IPv6Address): destination IPv6 address.
Returns:
ryu.lib.packet.ethernet: Serialized IPv6 neighbor discovery packet.
"""
pkt = build_pkt_header(
vid, eth_src, eth_dst, valve_of.ether.ETH_TYPE_IPV6)
ipv6_icmp6 = ipv6.ipv6(
src=src_ip,
dst=dst_ip,
nxt=valve_of.inet.IPPROTO_ICMPV6,
hop_limit=IPV6_MAX_HOP_LIM)
pkt.add_protocol(ipv6_icmp6)
icmpv6_nd_advert = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
dst=src_ip,
option=icmpv6.nd_option_tla(hw_src=eth_src), res=7))
pkt.add_protocol(icmpv6_nd_advert)
pkt.serialize()
return pkt
开发者ID:trungdtbk,项目名称:faucet,代码行数:28,代码来源:valve_packet.py
示例5: __init__
def __init__(self, mac_src, mac_dst, ipv6_src, ipv6_dst, ipv6_tgt):
"""
================ =========================================================
Input Parameter Description
================ =========================================================
mac_src String instance
mac_dst String instance
ipv6_src String instance
ipv6_dst String instance
ipv6_tgt String instance
================ =========================================================
================ =========================================================
Attribute Description
================ =========================================================
pkt The Neighbor Solicitation generated packet
================ =========================================================
"""
self.pkt = packet.Packet()
e = ethernet.ethernet(mac_dst, mac_src, ether.ETH_TYPE_IPV6)
i6 = ipv6.ipv6(src=ipv6_src, dst=ipv6_dst, nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_SOLICIT,
data=icmpv6.nd_neighbor(dst=ipv6_tgt, option=icmpv6.nd_option_sla(hw_src=mac_src)),
)
self.pkt.add_protocol(e)
self.pkt.add_protocol(i6)
self.pkt.add_protocol(ic)
开发者ID:wuyouke,项目名称:openflow-dmm,代码行数:29,代码来源:ndisc.py
示例6: test_to_string
def test_to_string(self):
nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
nd_opt_values = {'length': self.nd_length,
'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_sla.__name__, _nd_opt_str)
rs_values = {'res': repr(rs.res),
'option': nd_opt_str}
_rs_str = ','.join(['%s=%s' % (k, rs_values[k])
for k, v in inspect.getmembers(rs)
if k in rs_values])
rs_str = '%s(%s)' % (icmpv6.nd_router_solicit.__name__, _rs_str)
icmp_values = {'type_': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': rs_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
开发者ID:09beeihaq,项目名称:Coursera-SDN-Assignments,代码行数:31,代码来源:test_icmpv6.py
示例7: nd_reply
def nd_reply(eth_src, eth_dst, vid, src_ip, dst_ip, hop_limit):
"""Return IPv6 neighbor discovery reply packet.
Args:
eth_src (str): source Ethernet MAC address.
eth_dst (str): destination Ethernet MAC address.
vid (int or None): VLAN VID to use (or None).
src_ip (ipaddr.IPv6Address): source IPv6 address.
dst_ip (ipaddr.IPv6Address): destination IPv6 address.
hop_limit (int): IPv6 hop limit.
Returns:
ryu.lib.packet.ethernet: Serialized IPv6 neighbor discovery packet.
"""
pkt = build_pkt_header(
eth_src, eth_dst, vid, ether.ETH_TYPE_IPV6)
ipv6_reply = ipv6.ipv6(
src=src_ip,
dst=dst_ip,
nxt=inet.IPPROTO_ICMPV6,
hop_limit=hop_limit)
pkt.add_protocol(ipv6_reply)
icmpv6_reply = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
dst=src_ip,
option=icmpv6.nd_option_tla(hw_src=eth_src), res=7))
pkt.add_protocol(icmpv6_reply)
pkt.serialize()
return pkt
开发者ID:Baloc,项目名称:faucet,代码行数:29,代码来源:valve_packet.py
示例8: nd_request
def nd_request(eth_src, vid, src_ip, dst_ip):
"""Return IPv6 neighbor discovery request packet.
Args:
eth_src (str): source Ethernet MAC address.
vid (int or None): VLAN VID to use (or None).
src_ip (ipaddr.IPv6Address): source IPv6 address.
dst_ip (ipaddr.IPv6Address): requested IPv6 address.
Returns:
ryu.lib.packet.ethernet: Serialized IPv6 neighbor discovery packet.
"""
nd_mac = ipv6_link_eth_mcast(dst_ip)
ip_gw_mcast = ipv6_solicited_node_from_ucast(dst_ip)
pkt = build_pkt_header(eth_src, nd_mac, vid, ether.ETH_TYPE_IPV6)
ipv6_pkt = ipv6.ipv6(
src=str(src_ip), dst=ip_gw_mcast, nxt=inet.IPPROTO_ICMPV6)
pkt.add_protocol(ipv6_pkt)
icmpv6_pkt = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_SOLICIT,
data=icmpv6.nd_neighbor(
dst=dst_ip,
option=icmpv6.nd_option_sla(hw_src=eth_src)))
pkt.add_protocol(icmpv6_pkt)
pkt.serialize()
return pkt
开发者ID:Baloc,项目名称:faucet,代码行数:25,代码来源:valve_packet.py
示例9: test_to_string
def test_to_string(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
nd = icmpv6.nd_neighbor(
self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
nd_opt_values = {'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
nd_values = {'res': repr(nd.res),
'dst': repr(self.dst),
'type_': repr(self.nd_type),
'length': repr(self.nd_length),
'data': nd_opt_str}
_nd_str = ','.join(['%s=%s' % (k, nd_values[k])
for k, v in inspect.getmembers(nd)
if k in nd_values])
nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
icmp_values = {'type_': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': nd_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
开发者ID:leehomjan,项目名称:ryu,代码行数:34,代码来源:test_icmpv6.py
示例10: icmpv6_echo_reply
def icmpv6_echo_reply(eth_src, eth_dst, vid, src_ip, dst_ip, hop_limit,
id_, seq, data):
"""Return IPv6 ICMP echo reply packet.
Args:
eth_src (str): source Ethernet MAC address.
eth_dst (str): destination Ethernet MAC address.
vid (int or None): VLAN VID to use (or None).
src_ip (ipaddr.IPv6Address): source IPv6 address.
dst_ip (ipaddr.IPv6Address): destination IPv6 address.
hop_limit (int): IPv6 hop limit.
id_ (int): identifier for echo reply.
seq (int): sequence number for echo reply.
data (str): payload for echo reply.
Returns:
ryu.lib.packet.ethernet: Serialized IPv6 ICMP echo reply packet.
"""
pkt = build_pkt_header(
eth_src, eth_dst, vid, ether.ETH_TYPE_IPV6)
ipv6_reply = ipv6.ipv6(
src=src_ip,
dst=dst_ip,
nxt=inet.IPPROTO_ICMPV6,
hop_limit=hop_limit)
pkt.add_protocol(ipv6_reply)
icmpv6_reply = icmpv6.icmpv6(
type_=icmpv6.ICMPV6_ECHO_REPLY,
data=icmpv6.echo(id_=id_, seq=seq, data=data))
pkt.add_protocol(icmpv6_reply)
pkt.serialize()
return pkt
开发者ID:Baloc,项目名称:faucet,代码行数:31,代码来源:valve_packet.py
示例11: createPacket
def createPacket(self, src, dst, srcip, dstip):
# create send packet
# ether - vlan - ipv6 - icmpv6 ( - mldv2 )
sendpkt = packet.Packet()
sendpkt.add_protocol(ethernet.ethernet(
ethertype=ether.ETH_TYPE_8021Q, dst=dst, src=src))
sendpkt.add_protocol(vlan.vlan(
pcp=0, cfi=0, vid=100, ethertype=ether.ETH_TYPE_IPV6))
sendpkt.add_protocol(ipv6.ipv6(
src=srcip, dst=dstip, nxt=inet.IPPROTO_ICMPV6))
sendpkt.add_protocol(icmpv6.icmpv6(
type_=icmpv6.ICMPV6_MEMBERSHIP_QUERY,
data=icmpv6.mldv2_query(address='::')))
'''
sendpkt.add_protocol(icmpv6.icmpv6(
type_=icmpv6.MLDV2_LISTENER_REPORT,
data=icmpv6.mldv2_report(
record_num=2, records=[
icmpv6.mldv2_report_group(type_=1, address='::'),
icmpv6.mldv2_report_group(type_=2, address='::')])))
'''
sendpkt.serialize()
return sendpkt
开发者ID:t-koshiba,项目名称:trial,代码行数:25,代码来源:send_packet_to_asyncoresrv.py
示例12: sendDIO
def sendDIO(self, out_port=[], dst_ip=[], dst_mac =[], dodag_shutdown=False):
""" Send a DIO message
port: port number to send on
dst_ip: destination ippv6
dst_mac: destination mac
with the following Options:
1. Prefix Information
2. DODAG Configuration
3. DAO messages are sent after short interval if the sending node
is not the root of this DODAG or it is not poisoned
Called by: DIS message handler, if the DIS is unicast, posion, and setDIOtime
"""
# create RPL options
rploption = []
# create Dodag Configuration Option
dodag_conf = icmpv6_rpl.rpl_option_dodag_conf(length=0, # the rpl serialize method know what to do with len
flags=0, a = self.authenticated, pcs = self.PCS,
diointdouble = self.DIOIntDoublings, diointmin = self.DIOIntMin,
dioredun = self.DIORedundancyConst,
maxrankinc = self.MaxRankIncrease, minhoprankinc = self.MinHopRankIncrease,
ocp = self.OCP, reserved=0,
def_lifetime = self.DftLft, lifetime_unit = self.LftUnit)
rploption.append(dodag_conf)
# Create Prefix Information Option for every advertised prefix
for prefix in self.advertised_prefixes:
prefix_info = icmpv6_rpl.rpl_option_pi(length=0, pre_length=64,
l=0, a=1,r=0,
reserved1=0, valid_lifetime=RPL_PREFIX_TIME_INF, pref_lifetime=RPL_PREFIX_TIME_INF,
reserved2=0, prefix=prefix)
rploption.append(prefix_info)
# Create RPL_DIO message
diomsg = icmpv6_rpl.rpl_dio(rplinstanceid=self.instanceID,
version=self.version.get_val(),
rank=self.rank,g=self.G,o=0,mop=self.MOP,
prf=self.Prf,dtsn=self.DTSN.get_val(),
flags=0,reserved=0,
dodagid=self.dodagID,option=rploption)
# Create Dodag RPL Control message
rplmsg = icmpv6_rpl.rpl_control(code=icmpv6_rpl.RPL_DIO, rplctrl=diomsg)
# Create ICMPv6 protocol message
rpl = icmpv6.icmpv6(type_= icmpv6_rpl.ICMPv6_RPL, code=icmpv6_rpl.RPL_DIO,
csum=0, data= rplmsg)
if len(out_port) and len(dst_ip) and len(dst_mac):
self._send_rpl_pkt(out_port, dst_ip, dst_mac, rpl)
else:
self._broadcast_rpl_pkt(rpl)
# DAO message are sent after a short interval when DIO messages are
# sent
if not self.is_dodagRoot and not dodag_shutdown:
self.setDAOtimer()
del DIO_message
开发者ID:abdelwas,项目名称:Ryu-RPL,代码行数:60,代码来源:dodag.py
示例13: _send_icmp_NS
def _send_icmp_NS(self, datapath, outport_no, dst_ip):
src_mac_addr = \
str(self.dpid_to_switch[datapath.id].ports[outport_no].hw_addr)
src_ip = \
str(self.dpid_to_switch[datapath.id].ports[outport_no].gateway.gw_ipv6)
p = packet.Packet()
dst_mac, dst_ip_multicast = self._generate_dst_for_NS(dst_ip)
dst_mac = str(dst_mac)
dst_ip_multicast = str(dst_ip_multicast)
dst_ip = str(dst_ip)
e = ethernet.ethernet(dst = dst_mac, src = src_mac_addr,
ethertype = ether.ETH_TYPE_IPV6)
ip6 = ipv6.ipv6(version = 6, traffic_class = 0, flow_label = 0,
# 4byte ICMP header, 4byte reserved, 16byte target address,
# 8byte "source link-layer address" option
# next header value for ICMPv6 is 58
payload_length = 32, nxt = 58, hop_limit = 255,
src = src_ip, dst = dst_ip_multicast)
# source link-layer address
sla_addr = icmpv6.nd_option_sla(hw_src = src_mac_addr)
# ns for neighbor solicit; res for reserved, but actually is a flag,
# see comments on "nd_option_tla" above
ns = icmpv6.nd_neighbor(res = 4, dst = dst_ip, data = sla_addr)
ic6 = icmpv6.icmpv6(type_ = icmpv6.ND_NEIGHBOR_SOLICIT, code = 0,
# checksum = 0 then ryu calculate for you
csum = 0, data = ns)
p.add_protocol(e)
p.add_protocol(ip6)
p.add_protocol(ic6)
p.serialize()
datapath.send_packet_out(in_port = ofproto_v1_0.OFPP_NONE,
actions = [datapath.ofproto_parser.OFPActionOutput(outport_no)],
data = p.data)
开发者ID:cannium,项目名称:openflow_routing_framework,代码行数:33,代码来源:routing.py
示例14: createPacket
def createPacket(self, src, dst, srcip, dstip):
# create send packet
sendpkt = packet.Packet()
sendpkt.add_protocol(ethernet.ethernet(ethertype=ether.ETH_TYPE_8021Q, dst=dst, src=src))
sendpkt.add_protocol(vlan.vlan(pcp=0, cfi=0, vid=100, ethertype=ether.ETH_TYPE_IPV6))
sendpkt.add_protocol(ipv6.ipv6(src=srcip, dst=dstip, nxt=inet.IPPROTO_ICMPV6))
sendpkt.add_protocol(icmpv6.icmpv6(type_=icmpv6.ICMPV6_MEMBERSHIP_QUERY,
data=icmpv6.mldv2_query(address='ff38::1')))
sendpkt.serialize()
return sendpkt
开发者ID:takeuchi17401,项目名称:performance_test,代码行数:10,代码来源:send_mldv2_persec_ipv4.py
示例15: test_send_packet_to_ryu
def test_send_packet_to_ryu(self):
logger.debug("test_send_packet_to_ryu")
"""
# send of zeromq
self.send_sock.send(cPickle.dumps(ryu_packet, protocol=0))
"""
eth = ethernet.ethernet()
ip6 = ipv6.ipv6()
icmp6 = icmpv6.icmpv6()
packet = eth / ip6 / icmp6
packet.serialize()
# TODO send_sock.send()のMock化
"""
开发者ID:t-koshiba,项目名称:trial,代码行数:15,代码来源:test_mld_process.py
示例16: test_send_packet_to_sw
def test_send_packet_to_sw(self):
eth = ethernet.ethernet()
ip6 = ipv6.ipv6()
icmp6 = icmpv6.icmpv6()
packet = eth / ip6 / icmp6
packet.serialize()
# sendrecv.sendp()のMock化
sendpkt = scapy_packet.Packet(packet.data)
self.mocker.StubOutWithMock(sendrecv, "sendp")
sendrecv.sendp(sendpkt).AndReturn(0)
self.mocker.ReplayAll()
self.mld_proc.send_packet_to_sw(packet)
self.mocker.UnsetStubs()
self.mocker.VerifyAll()
开发者ID:t-koshiba,项目名称:trial,代码行数:16,代码来源:test_mld_process.py
示例17: _test_serialize
def _test_serialize(self, nd_data=None):
nd_data = str(nd_data or "")
buf = self.buf + nd_data
src_ipv6 = netaddr.IPAddress("fe80::102d:a5ff:fe6d:bc0f").packed
dst_ipv6 = netaddr.IPAddress("ff02::2").packed
prev = ipv6(6, 0, 0, len(buf), 58, 255, src_ipv6, dst_ipv6)
nd_csum = icmpv6_csum(prev, buf)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd_data)
buf = buffer(icmp.serialize(bytearray(), prev))
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
data = buf[icmp._MIN_LEN :]
eq_(type_, self.type_)
eq_(code, self.code)
eq_(csum, nd_csum)
eq_(data, nd_data)
开发者ID:routeflow,项目名称:ryu,代码行数:17,代码来源:test_icmpv6.py
示例18: test_serialize_without_data
def test_serialize_without_data(self):
rs = icmpv6.nd_router_solicit(self.res)
prev = ipv6(6, 0, 0, 8, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
buf = buffer(icmp.serialize(bytearray(), prev))
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN):]
eq_(type_, self.type_)
eq_(code, self.code)
eq_(csum, rs_csum)
eq_(res[0], self.res)
eq_(data, '')
开发者ID:09beeihaq,项目名称:Coursera-SDN-Assignments,代码行数:17,代码来源:test_icmpv6.py
示例19: test_serialize_without_data
def test_serialize_without_data(self):
nd = icmpv6.nd_neighbor(self.res, self.dst)
prev = ipv6(6, 0, 0, 24, 64, 255, self.src_ipv6, self.dst_ipv6)
nd_csum = icmpv6_csum(prev, self.buf)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd)
buf = buffer(icmp.serialize(bytearray(), prev))
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
(res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
data = buf[(icmp._MIN_LEN + nd._MIN_LEN) :]
eq_(type_, self.type_)
eq_(code, self.code)
eq_(csum, nd_csum)
eq_(res >> 29, self.res)
eq_(dst, self.dst)
eq_(data, "")
开发者ID:routeflow,项目名称:ryu,代码行数:18,代码来源:test_icmpv6.py
示例20: _handle_icmp
def _handle_icmp(self, datapath, port, pkt_ethernet, pkt_ipv6, pkt_icmpv6):
print('_handle_icmp STR')
if pkt_icmpv6.type_ !=icmpv6.ICMPV6_ECHO_REQUEST:
return
pkt = packet.Packet()
pkt.add_protocol(ethernet.ethernet(ethertype=pkt_ethernet.ethertype,
dst=pkt_ethernet.dst,
src=pkt_ethernet.src))
pkt.add_protocol(ipv6.ipv6(dst=pkt_ipv6.dst,
src=pkt_ipv6.src))
# proto=pkt_ipv6.proto))
pkt.add_protocol(icmpv6.icmpv6(type_=icmpv6.ICMPV6_ECHO_REPLY,
# code=icmpv6.ICMPV6_ECHO_REPLY_CODE,
csum=0,
data=pkt_icmpv6.data))
print('_handle_icmp CALL')
self._send_packet(datapath, port, pkt)
开发者ID:takeuchi17401,项目名称:performance_test,代码行数:18,代码来源:sendMLD_old.py
注:本文中的ryu.lib.packet.icmpv6.icmpv6函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论