本文整理汇总了Python中ryu.lib.packet.ipv4.ipv4函数的典型用法代码示例。如果您正苦于以下问题:Python ipv4函数的具体用法?Python ipv4怎么用?Python ipv4使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ipv4函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: build_pkt
def build_pkt(pkt):
layers = []
if 'arp_target_ip' in pkt:
ethertype = 0x806
layers.append(arp.arp(dst_ip=pkt['arp_target_ip']))
elif 'ipv6_src' in pkt:
ethertype = 0x86DD
layers.append(ipv6.ipv6(src=pkt['ipv6_src'], dst=pkt['ipv6_src']))
else:
ethertype = 0x800
if 'ipv4_src' in pkt:
net = ipv4.ipv4(src=pkt['ipv4_src'], dst=pkt['ipv4_dst'])
else:
net = ipv4.ipv4()
layers.append(net)
if 'vid' in pkt:
tpid = 0x8100
layers.append(vlan.vlan(vid=pkt['vid'], ethertype=ethertype))
else:
tpid = ethertype
eth = ethernet.ethernet(
dst=pkt['eth_dst'],
src=pkt['eth_src'],
ethertype=tpid)
layers.append(eth)
result = packet.Packet()
for layer in layers:
result.add_protocol(layer)
return result
开发者ID:Baloc,项目名称:faucet,代码行数:29,代码来源:test_valve.py
示例2: test_default_args
def test_default_args(self):
prev = ipv4(proto=inet.IPPROTO_IGMP)
g = igmpv3_report()
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
buf = bytearray(buf)
pack_into("!H", buf, 2, 0)
eq_(res[0], IGMP_TYPE_REPORT_V3)
eq_(res[1], checksum(buf))
eq_(res[2], 0)
# records without record_num
prev = ipv4(proto=inet.IPPROTO_IGMP)
record1 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 0, "225.0.0.1")
record2 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 2, "225.0.0.2", ["172.16.10.10", "172.16.10.27"])
record3 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 0, "225.0.0.3", [], b"abc\x00")
record4 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 2, "225.0.0.4", ["172.16.10.10", "172.16.10.27"], b"abc\x00")
records = [record1, record2, record3, record4]
g = igmpv3_report(records=records)
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
buf = bytearray(buf)
pack_into("!H", buf, 2, 0)
eq_(res[0], IGMP_TYPE_REPORT_V3)
eq_(res[1], checksum(buf))
eq_(res[2], len(records))
开发者ID:John-Lin,项目名称:ryu,代码行数:30,代码来源:test_igmp.py
示例3: _do_leave
def _do_leave(self, leave, in_port, msg):
"""the process when the snooper received a LEAVE message."""
datapath = msg.datapath
dpid = datapath.id
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# check whether the querier port has been specified.
if not self._to_querier.get(dpid):
self.logger.info("no querier exists.")
return
# save this LEAVE message and reset the condition of the port
# that received this message.
self._to_hosts.setdefault(dpid, {})
self._to_hosts[dpid].setdefault(
leave.address,
{'replied': False, 'leave': None, 'ports': {}})
self._to_hosts[dpid][leave.address]['leave'] = msg
self._to_hosts[dpid][leave.address]['ports'][in_port] = {
'out': False, 'in': False}
# create a specific query.
timeout = igmp.LAST_MEMBER_QUERY_INTERVAL
res_igmp = igmp.igmp(
msgtype=igmp.IGMP_TYPE_QUERY,
maxresp=timeout * 10,
csum=0,
address=leave.address)
res_ipv4 = ipv4.ipv4(
total_length=len(ipv4.ipv4()) + len(res_igmp),
proto=inet.IPPROTO_IGMP, ttl=1,
src=self._to_querier[dpid]['ip'],
dst=igmp.MULTICAST_IP_ALL_HOST)
res_ether = ethernet.ethernet(
dst=igmp.MULTICAST_MAC_ALL_HOST,
src=self._to_querier[dpid]['mac'],
ethertype=ether.ETH_TYPE_IP)
res_pkt = packet.Packet()
res_pkt.add_protocol(res_ether)
res_pkt.add_protocol(res_ipv4)
res_pkt.add_protocol(res_igmp)
res_pkt.serialize()
# send a specific query to the host that sent this message.
actions = [parser.OFPActionOutput(ofproto.OFPP_IN_PORT)]
self._do_packet_out(datapath, res_pkt.data, in_port, actions)
# wait for REPORT messages.
hub.spawn(self._do_timeout_for_leave, timeout, datapath,
leave.address, in_port)
开发者ID:janieltec,项目名称:ryu-python,代码行数:51,代码来源:igmplib_jan.py
示例4: build_new_packet
def build_new_packet(switch):
src_eth = switch['src_eth']
dst_eth = switch['dst_eth']
src_ip = switch['src_ip']
dst_ip = switch['dst_ip']
ip_id = switch['ip_id']
seq = switch['seq']
ack = switch['ack']
src_port = switch['src_port']
dst_port = switch['dst_port']
#eth proto
eth_pkt = ethernet.ethernet(src_eth,dst_eth,ethertype=ether_types.ETH_TYPE_IP)
#ip proto
ip_pkt = ipv4.ipv4(version=4, header_length=5,
tos=0, total_length=0,
identification=ip_id+1, flags=2,
offset=0, ttl=64,
proto=in_proto.IPPROTO_TCP, csum=0,
src=dst_ip, dst=src_ip)
#tcp proto
#psh ack bit
bits = 0b011000
tcp_pkt = tcp.tcp(src_port=dst_port, dst_port=src_port,
seq=ack, ack=seq, offset=0,
bits=bits, window_size=2048,
csum=0, urgent=0, option=None)
p = packet.Packet()
p.add_protocol(eth_pkt)
p.add_protocol(ip_pkt)
p.add_protocol(tcp_pkt)
return p
开发者ID:jsjhan,项目名称:test,代码行数:32,代码来源:a.py
示例5: test_serialize_with_auth_simple
def test_serialize_with_auth_simple(self):
pkt = packet.Packet()
eth_pkt = ethernet.ethernet('08:00:27:d1:95:7c', '08:00:27:ed:54:41')
pkt.add_protocol(eth_pkt)
ip_pkt = ipv4.ipv4(src='192.168.57.2', dst='192.168.57.1', tos=192,
identification=3216, proto=inet.IPPROTO_UDP)
pkt.add_protocol(ip_pkt)
udp_pkt = udp.udp(49152, 3784)
pkt.add_protocol(udp_pkt)
auth_cls = bfd.SimplePassword(auth_key_id=2,
password=self.auth_keys[2])
bfd_pkt = bfd.bfd(ver=1, diag=bfd.BFD_DIAG_NO_DIAG,
flags=bfd.BFD_FLAG_AUTH_PRESENT,
state=bfd.BFD_STATE_DOWN, detect_mult=3, my_discr=1,
your_discr=0, desired_min_tx_interval=1000000,
required_min_rx_interval=1000000,
required_min_echo_rx_interval=0,
auth_cls=auth_cls)
pkt.add_protocol(bfd_pkt)
eq_(len(pkt.protocols), 4)
pkt.serialize()
eq_(pkt.data, self.data_auth_simple)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:30,代码来源:test_bfd.py
示例6: build_carp_ad_packet
def build_carp_ad_packet(src_mac):
# 16 bit for ip layer identification length
# getrandbits return long int
identification = int(random.getrandbits(16))
#total 3 bit in flags
#position 0 : reserved
#position 1 : don't fragment
#position 2 : more fragment
flags = 0b010
eth_pkt = ethernet.ethernet(dst=vrrp.VRRP_IPV4_DST_MAC_ADDRESS,
#src=vrrp_ipv4_src_mac_address(CONF.CARP.VRID),
src=src_mac,
ethertype=ether_types.ETH_TYPE_IP)
ip_pkt = ipv4.ipv4( identification=identification,
flags=flags,
ttl=vrrp.VRRP_IPV4_TTL,
proto=in_proto.IPPROTO_VRRP,
dst=vrrp.VRRP_IPV4_DST_ADDRESS)
carp_pkt = vrrp.vrrpv2.create(type_=vrrp.VRRP_TYPE_ADVERTISEMENT,vrid=CONF.CARP.VRID,
priority=0,max_adver_int=1,ip_addresses=[CONF.PARAMETER.LOGICAL_IP])
p = packet.Packet()
p.add_protocol(eth_pkt)
p.add_protocol(ip_pkt)
p.add_protocol(carp_pkt)
return p
开发者ID:jsjhan,项目名称:test,代码行数:25,代码来源:load_balancer.py
示例7: test_serialize_option
def test_serialize_option(self):
# prepare test data
offset = 0
csum = 0
option = [
tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
tcp.TCPOptionSACKPermitted(),
tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
tcp.TCPOptionNoOperation(),
tcp.TCPOptionWindowScale(shift_cnt=9),
]
option_buf = b"\x02\x04\x05\xb4" b"\x04\x02" b"\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88" b"\x01" b"\x03\x03\x09"
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0, "192.168.10.1", "192.168.100.1")
# test serializer
t = tcp.tcp(
self.src_port,
self.dst_port,
self.seq,
self.ack,
offset,
self.bits,
self.window_size,
csum,
self.urgent,
option,
)
buf = t.serialize(bytearray(), prev)
r_option_buf = buf[tcp.tcp._MIN_LEN : tcp.tcp._MIN_LEN + len(option_buf)]
eq_(option_buf, r_option_buf)
# test parser
(r_tcp, _, _) = tcp.tcp.parser(buf)
eq_(str(option), str(r_tcp.option))
开发者ID:chenzheng128,项目名称:ArsenalPython,代码行数:34,代码来源:test_tcp.py
示例8: sendReply
def sendReply(self, sdonID, origMsg, action, reply):
datapath = origMsg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
in_port = origMsg.match['in_port']
if (action == "hello,"):
# save this SdonManager in the dictionary so we can contact that switch when we want
print "Saving " + str(sdonID) + "'s OF details."
self.sdonID_to_OF[sdonID] = [datapath, parser, in_port]
# Initialize the dictionary of Apps available for an SdonNode
# self.sdonID_to_apps[pubIp] = []
# Reply to the action message
pkt = packet.Packet()
e = ethernet.ethernet()
i = ipv4.ipv4(src="10.0.0.1", dst = "1.2.3.4")
# u = udp.udp()
pkt= e/i/(str(action) + str(reply))
pkt.serialize()
data = pkt.data
actions=[parser.OFPActionOutput(in_port)]
out = parser.OFPPacketOut(datapath=datapath,
buffer_id=ofproto.OFP_NO_BUFFER,
in_port=ofproto.OFPP_CONTROLLER,
actions=actions,
data=data)
datapath.send_msg(out)
开发者ID:sdon-poc,项目名称:sdon-poc,代码行数:29,代码来源:sdon.py
示例9: packet_in_handler
def packet_in_handler(self, ev):
pkt = packet.Packet()
pkt.add_protocol(ethernet.ethernet(ethertype=0x0800, dst='00:00:00:00:00:02', src='00:00:00:00:00:01'))
pkt.add_protocol(ipv4.ipv4(dst='10.0.0.2', src='10.0.0.1', proto=17))
pkt.add_protocol(udp.udp(src_port=1000, dst_port=8000))
payload = b'Hellooooooooo~~~~~~~~~'
pkt.add_protocol(payload)
# Packet serializing
pkt.serialize()
data = pkt.data
msg = ev.msg
dp = msg.datapath
ofproto = dp.ofproto
actions = [dp.ofproto_parser.OFPActionOutput(2)]
out = dp.ofproto_parser.OFPPacketOut(
datapath=dp, buffer_id=ofproto.OFP_NO_BUFFER, in_port=msg.match['in_port'],
actions=actions, data=data)
dp.send_msg(out)
开发者ID:TakeshiTseng,项目名称:SDN-Work,代码行数:26,代码来源:pg.py
示例10: build_packet_tcp_22
def build_packet_tcp_22():
"""
Build an SSH-like packet for use in tests.
"""
#ethernet(dst='08:00:27:1e:98:88',ethertype=2048,src='00:00:00:00:00:02')
#ipv4(csum=25158,dst='192.168.57.40',flags=2,header_length=5,
# identification=58864,offset=0,option=None,proto=6,src='192.168.56.12',
# tos=0,total_length=60,ttl=64,version=4)
#tcp(ack=0,bits=2,csum=60347,dst_port=22,offset=10,
# option='\x02\x04\x05\xb4\x04\x02\x08\n\x00\x08\x16\x0f\x00\x00\x00\x00\
# x01\x03\x03\x06'
# ,seq=533918719,src_port=52656,urgent=0,window_size=29200)
e = ethernet.ethernet(dst='00:00:00:00:00:02',
src='00:00:00:00:00:01',
ethertype=2048)
i = ipv4.ipv4(version=4, header_length=5, tos=0, total_length=0,
identification=0, flags=0, offset=0, ttl=255, proto=0,
csum=0, src='10.0.0.1', dst='10.0.0.2', option=None)
t = tcp.tcp(src_port=52656, dst_port=22, seq=533918719, ack=0, offset=10,
bits=2, window_size=29200, csum=0, urgent=0, option=None)
p = packet.Packet()
p.add_protocol(e)
p.add_protocol(i)
p.add_protocol(t)
p.serialize()
print repr(p.data) # the on-wire packet
return p
开发者ID:EnjoyHacking,项目名称:nmeta,代码行数:28,代码来源:tests_integration.py
示例11: match_to_packet
def match_to_packet(self, match):
pkt = packet.Packet()
l2 = ethernet.ethernet(
dst=match.setdefault('eth_dst', "00:00:00:00:00:02"),
src=match.setdefault('eth_src', "00:00:00:00:00:01"),
ethertype=match.setdefault('eth_type', 0x800)
)
pkt.add_protocol(l2)
if 'vlan_vid' in match:
pkt.get_protocol(ethernet.ethernet).ethertype=0x8100
vl = vlan.vlan(
pcp=0,
cfi=0,
vid=match['vlan_vid'],
ethertype=match['eth_type']
)
pkt.add_protocol(vl)
l3 = ipv4.ipv4(
src=match.setdefault('ipv4_src', "192.168.1.1"),
dst=match.setdefault('ipv4_dst', "192.168.1.2")
)
pkt.add_protocol(l3)
l4 = tcp.tcp(
src_port=match.setdefault('tcp_src', 12345),
dst_port=match.setdefault('tcp_dst', 80)
)
pkt.add_protocol(l4)
pkt.serialize()
return pkt
开发者ID:KitL,项目名称:ofexam,代码行数:30,代码来源:exam.py
示例12: send_ls_ack
def send_ls_ack(self,ospf_pkt_ls_up):
### if success add
i=0
ack=[]
print ospf_pkt_ls_up.lsas
while i < len(ospf_pkt_ls_up.lsas):
ack.append(ospf_pkt_ls_up.lsas[i].header)
i+=1
msg = ospf.OSPFLSAck(router_id=self.router_id,
lsa_headers=ack)
rtr = self.routers
datapath = rtr.datapath
p = packet.Packet()
e = ethernet.ethernet(dst = '01:00:5e:00:00:05',src = '00:0c:29:d4:10:d7', ethertype = ether.ETH_TYPE_IP)
f = ipv4.ipv4(dst='224.0.0.5',
src='172.16.33.191',
proto=inet.IPPROTO_OSPF)
p.add_protocol(e)
p.add_protocol(f)
p.add_protocol(msg)
p.serialize()
actions = [datapath.ofproto_parser.OFPActionOutput(1)]
out = datapath.ofproto_parser.OFPPacketOut(datapath=datapath,
buffer_id=datapath.ofproto.OFP_NO_BUFFER,
in_port=datapath.ofproto.OFPP_CONTROLLER,
actions=actions,
data=p.data)
datapath.send_msg(out)
dist, previous = self.shortest_path()
self.update_fwd_table(dist, previous)
开发者ID:mehdi149,项目名称:L3_MODULES,代码行数:33,代码来源:ospf_mod.py
示例13: test_serialize_with_auth_sha1
def test_serialize_with_auth_sha1(self):
pkt = packet.Packet()
eth_pkt = ethernet.ethernet("08:00:27:d1:95:7c", "08:00:27:ed:54:41")
pkt.add_protocol(eth_pkt)
ip_pkt = ipv4.ipv4(src="192.168.57.2", dst="192.168.57.1", tos=192, identification=2960, proto=inet.IPPROTO_UDP)
pkt.add_protocol(ip_pkt)
udp_pkt = udp.udp(49152, 3784)
pkt.add_protocol(udp_pkt)
auth_cls = bfd.KeyedSHA1(auth_key_id=2, seq=16817, auth_key=self.auth_keys[2])
bfd_pkt = bfd.bfd(
ver=1,
diag=bfd.BFD_DIAG_NO_DIAG,
flags=bfd.BFD_FLAG_AUTH_PRESENT,
state=bfd.BFD_STATE_DOWN,
detect_mult=3,
my_discr=1,
your_discr=0,
desired_min_tx_interval=1000000,
required_min_rx_interval=1000000,
required_min_echo_rx_interval=0,
auth_cls=auth_cls,
)
pkt.add_protocol(bfd_pkt)
eq_(len(pkt.protocols), 4)
pkt.serialize()
eq_(pkt.data, self.data_auth_sha1)
开发者ID:chenleji,项目名称:ryu,代码行数:34,代码来源:test_bfd.py
示例14: assemble_offer
def assemble_offer(self, pkt):
disc_eth = pkt.get_protocol(ethernet.ethernet)
disc_ipv4 = pkt.get_protocol(ipv4.ipv4)
disc_udp = pkt.get_protocol(udp.udp)
disc = dhcp.dhcp.parser(pkt[3])
disc[0].options.option_list.remove(next(opt for opt in disc[0].options.option_list if opt.tag == 55))
disc[0].options.option_list.remove(next(opt for opt in disc[0].options.option_list if opt.tag == 53))
disc[0].options.option_list.remove(next(opt for opt in disc[0].options.option_list if opt.tag == 12))
disc[0].options.option_list.insert(0, dhcp.option(tag=1, value=self.bin_netmask))
disc[0].options.option_list.insert(0, dhcp.option(tag=3, value=self.bin_server))
disc[0].options.option_list.insert(0, dhcp.option(tag=6, value=self.bin_dns))
disc[0].options.option_list.insert(0, dhcp.option(tag=12, value=self.hostname))
disc[0].options.option_list.insert(0, dhcp.option(tag=53, value='02'.decode('hex')))
disc[0].options.option_list.insert(0, dhcp.option(tag=54, value=self.bin_server))
offer_pkt = packet.Packet()
offer_pkt.add_protocol(ethernet.ethernet(ethertype=disc_eth.ethertype, dst=disc_eth.src, src=self.hw_addr))
offer_pkt.add_protocol(ipv4.ipv4(dst=disc_ipv4.dst, src=self.dhcp_server, proto=disc_ipv4.proto))
offer_pkt.add_protocol(udp.udp(src_port=67,dst_port=68))
offer_pkt.add_protocol(dhcp.dhcp(op=2, chaddr=disc_eth.src,
siaddr=self.dhcp_server,
boot_file=disc[0].boot_file,
yiaddr=self.ip_addr,
xid=disc[0].xid,
options=disc[0].options))
self.logger.info("ASSEMBLED OFFER: %s" % offer_pkt)
return offer_pkt
开发者ID:wuyouke,项目名称:ryu-dhcp,代码行数:27,代码来源:dhcp.py
示例15: test_serialize
def test_serialize(self):
src_port = 6431
dst_port = 8080
total_length = 0
csum = 0
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_UDP, 0, src_ip, dst_ip)
u = udp(src_port, dst_port, total_length, csum)
buf = u.serialize(bytearray(), prev)
res = struct.unpack(udp._PACK_STR, buf)
eq_(res[0], src_port)
eq_(res[1], dst_port)
eq_(res[2], struct.calcsize(udp._PACK_STR))
# checksum
ph = struct.pack('!4s4sBBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip), 0, 17, res[2])
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)
开发者ID:Aminiok,项目名称:ryu,代码行数:26,代码来源:test_udp.py
示例16: test_serialize
def test_serialize(self):
pkt = packet.Packet()
eth_pkt = ethernet.ethernet("b0:a8:6e:18:b8:08", "64:87:88:e9:cb:c8")
pkt.add_protocol(eth_pkt)
ip_pkt = ipv4.ipv4(src="172.28.3.1", dst="172.28.3.2", tos=192, identification=26697, proto=inet.IPPROTO_UDP)
pkt.add_protocol(ip_pkt)
udp_pkt = udp.udp(49152, 3784)
pkt.add_protocol(udp_pkt)
bfd_pkt = bfd.bfd(
ver=1,
diag=bfd.BFD_DIAG_CTRL_DETECT_TIME_EXPIRED,
state=bfd.BFD_STATE_UP,
detect_mult=3,
my_discr=6,
your_discr=7,
desired_min_tx_interval=60000,
required_min_rx_interval=60000,
required_min_echo_rx_interval=0,
)
pkt.add_protocol(bfd_pkt)
eq_(len(pkt.protocols), 4)
pkt.serialize()
eq_(pkt.data, self.data)
开发者ID:chenleji,项目名称:ryu,代码行数:29,代码来源:test_bfd.py
示例17: _build_vlan
def _build_vlan(self):
src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54')
dst_mac = mac.haddr_to_bin('00:00:00:00:00:00')
ethertype = ether.ETH_TYPE_8021Q
e = ethernet(dst_mac, src_mac, ethertype)
version = 4
header_length = 20
tos = 0
total_length = 24
identification = 0x8a5d
flags = 0
offset = 1480
ttl = 64
proto = inet.IPPROTO_ICMP
csum = 0xa7f2
src = int(netaddr.IPAddress('131.151.32.21'))
dst = int(netaddr.IPAddress('131.151.32.129'))
option = 'TEST'
ip = ipv4(version, header_length, tos, total_length, identification,
flags, offset, ttl, proto, csum, src, dst, option)
p = Packet()
p.add_protocol(e)
p.add_protocol(self.v)
p.add_protocol(ip)
p.serialize()
return p
开发者ID:09zwcbupt,项目名称:ryu,代码行数:30,代码来源:test_vlan.py
示例18: new_udp_pkt
def new_udp_pkt(eth_dst, eth_src, ip_dst, ip_src, src_port, dst_port, size=0):
# pcap_pen = pcaplib.Writer(open('pkt.pcap', 'wb'))
# Creat an empty Packet instance
pkt = packet.Packet()
pkt.add_protocol(ethernet.ethernet(ethertype=0x0800, dst=eth_dst,
src=eth_src))
pkt.add_protocol(ipv4.ipv4(dst=ip_dst, src=ip_src, proto=17))
pkt.add_protocol(udp.udp(src_port=src_port, dst_port=dst_port))
# Check how many byte be used under layer 3
_pkt = copy.deepcopy(pkt)
_pkt.serialize()
_d = _pkt.data
# the max. packet size is 1500 byte
limited_size = 1500 - len(_d)
# if size larger than 1500 byte set limit size
if size >= limited_size:
size = limited_size
if size != 0:
payload = os.urandom(size)
# Add payload
pkt.add_protocol(payload)
# Packet serializing
pkt.serialize()
data = pkt.data
# pcap_pen.write_pkt(data)
return data
开发者ID:John-Lin,项目名称:sdn-testing-tool,代码行数:33,代码来源:pktgen.py
示例19: send_ospf_hello
def send_ospf_hello(self,signum, frame):
ospf_hello = ospf.OSPFHello(router_id = self.router_id ,neighbors = self.neighbors)
rtr = self.routers
datapath = rtr.datapath
hello = ospf_hello.serialize()
p = packet.Packet()
e = ethernet.ethernet(dst = '00:0c:29:9c:a6:82',src = '00:0c:29:52:9b:c4', ethertype = ether.ETH_TYPE_IP)
f = ipv4.ipv4(dst='192.168.66.195',
src='192.168.66.135',
proto=inet.IPPROTO_OSPF)
p.add_protocol(e)
p.add_protocol(f)
p.add_protocol(hello)
p.serialize()
print " ================= P.DATA =========== "
print p.data
print " ================= P.DATA =========== "
actions = [datapath.ofproto_parser.OFPActionOutput(6)]
out = datapath.ofproto_parser.OFPPacketOut(datapath=datapath,
buffer_id=datapath.ofproto.OFP_NO_BUFFER,
in_port=datapath.ofproto.OFPP_CONTROLLER,
actions=actions,
data=p.data)
datapath.send_msg(out)
self.call()
开发者ID:mehdi149,项目名称:L3_MODULES,代码行数:26,代码来源:ospf_mod.py
示例20: test_serialize
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').value
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').value
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv2.create(
type_, vrid, priority, max_adver_int, ip_addresses)
buf = vrrp_.serialize(bytearray(), prev)
pack_str = vrrp.vrrpv2._PACK_STR + 'III'
pack_len = struct.calcsize(pack_str)
res = struct.unpack(pack_str, str(buf))
eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
eq_(res[1], vrid)
eq_(res[2], priority)
eq_(res[3], len(ip_addresses))
eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
eq_(res[5], max_adver_int)
# res[6] is checksum
eq_(res[7], ip_address)
eq_(res[8], 0)
eq_(res[9], 0)
eq_(len(buf), pack_len)
# checksum
s = packet_utils.checksum(buf)
eq_(0, s)
开发者ID:isomer,项目名称:ryu,代码行数:35,代码来源:test_vrrp.py
注:本文中的ryu.lib.packet.ipv4.ipv4函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论