本文整理汇总了Python中ryu.lib.packet.packet_utils.checksum函数的典型用法代码示例。如果您正苦于以下问题:Python checksum函数的具体用法?Python checksum怎么用?Python checksum使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了checksum函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_default_args
def test_default_args(self):
prev = ipv4(proto=inet.IPPROTO_IGMP)
g = igmpv3_report()
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
buf = bytearray(buf)
pack_into("!H", buf, 2, 0)
eq_(res[0], IGMP_TYPE_REPORT_V3)
eq_(res[1], checksum(buf))
eq_(res[2], 0)
# records without record_num
prev = ipv4(proto=inet.IPPROTO_IGMP)
record1 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 0, "225.0.0.1")
record2 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 2, "225.0.0.2", ["172.16.10.10", "172.16.10.27"])
record3 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 0, "225.0.0.3", [], b"abc\x00")
record4 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 2, "225.0.0.4", ["172.16.10.10", "172.16.10.27"], b"abc\x00")
records = [record1, record2, record3, record4]
g = igmpv3_report(records=records)
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
buf = bytearray(buf)
pack_into("!H", buf, 2, 0)
eq_(res[0], IGMP_TYPE_REPORT_V3)
eq_(res[1], checksum(buf))
eq_(res[2], len(records))
开发者ID:John-Lin,项目名称:ryu,代码行数:30,代码来源:test_igmp.py
示例2: test_serialize
def test_serialize(self):
src_ip = netaddr.IPAddress('192.168.0.1').value
dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
inet.IPPROTO_VRRP, 0, src_ip, dst_ip)
type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
vrid = 5
priority = 10
max_adver_int = 30
ip_address = netaddr.IPAddress('192.168.0.2').value
ip_addresses = [ip_address]
vrrp_ = vrrp.vrrpv2.create(
type_, vrid, priority, max_adver_int, ip_addresses)
buf = vrrp_.serialize(bytearray(), prev)
pack_str = vrrp.vrrpv2._PACK_STR + 'III'
pack_len = struct.calcsize(pack_str)
res = struct.unpack(pack_str, str(buf))
eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
eq_(res[1], vrid)
eq_(res[2], priority)
eq_(res[3], len(ip_addresses))
eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
eq_(res[5], max_adver_int)
# res[6] is checksum
eq_(res[7], ip_address)
eq_(res[8], 0)
eq_(res[9], 0)
eq_(len(buf), pack_len)
# checksum
s = packet_utils.checksum(buf)
eq_(0, s)
开发者ID:isomer,项目名称:ryu,代码行数:35,代码来源:test_vrrp.py
示例3: test_serialize
def test_serialize(self):
src_port = 6431
dst_port = 8080
total_length = 0
csum = 0
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_UDP, 0, src_ip, dst_ip)
u = udp(src_port, dst_port, total_length, csum)
buf = u.serialize(bytearray(), prev)
res = struct.unpack(udp._PACK_STR, buf)
eq_(res[0], src_port)
eq_(res[1], dst_port)
eq_(res[2], struct.calcsize(udp._PACK_STR))
# checksum
ph = struct.pack('!4s4sBBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip), 0, 17, res[2])
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)
开发者ID:Aminiok,项目名称:ryu,代码行数:26,代码来源:test_udp.py
示例4: icmpv6_csum
def icmpv6_csum(prev, buf):
ph = struct.pack('!16s16sBBH', prev.src, prev.dst, 0, prev.nxt,
prev.payload_length)
h = bytearray(buf)
struct.pack_into('!H', h, 2, 0)
return socket.htons(packet_utils.checksum(ph + h))
开发者ID:09zwcbupt,项目名称:ryu,代码行数:7,代码来源:test_icmpv6.py
示例5: _parser
def _parser(cls, buf):
if len(buf) < cls._HDR_LEN:
raise stream_parser.StreamParser.TooSmallException(
'%d < %d' % (len(buf), cls._HDR_LEN))
(version, type_, length, router_id, area_id, checksum, au_type,
authentication) = struct.unpack_from(cls._HDR_PACK_STR,
six.binary_type(buf))
# Exclude checksum and authentication field for checksum validation.
if packet_utils.checksum(buf[:12] + buf[14:16] + buf[cls._HDR_LEN:]) \
!= checksum:
raise InvalidChecksum
if len(buf) < length:
raise stream_parser.StreamParser.TooSmallException(
'%d < %d' % (len(buf), length))
router_id = addrconv.ipv4.bin_to_text(router_id)
area_id = addrconv.ipv4.bin_to_text(area_id)
binmsg = buf[cls._HDR_LEN:length]
rest = buf[length:]
subcls = cls._lookup_type(type_)
kwargs = subcls.parser(binmsg)
return subcls(length, router_id, area_id, au_type, int(authentication),
checksum, version, **kwargs), None, rest
开发者ID:Aries-Sushi,项目名称:ryu,代码行数:25,代码来源:ospf.py
示例6: serialize_static
def serialize_static(vrrp_, prev):
assert not vrrp_.is_ipv6 # vrrpv2 defines only IPv4
ip_addresses_pack_str = vrrpv2._ip_addresses_pack_str(vrrp_.count_ip)
ip_addresses_len = struct.calcsize(ip_addresses_pack_str)
vrrp_len = vrrpv2._MIN_LEN + ip_addresses_len + vrrpv2._AUTH_DATA_LEN
checksum = False
if vrrp_.checksum is None:
checksum = True
vrrp_.checksum = 0
if vrrp_.auth_type is None:
vrrp_.auth_type = VRRP_AUTH_NO_AUTH
if vrrp_.auth_data is None:
vrrp_.auth_data = VRRP_AUTH_DATA
buf = bytearray(vrrp_len)
offset = 0
struct.pack_into(vrrpv2._PACK_STR, buf, offset,
vrrp_to_version_type(vrrp_.version, vrrp_.type),
vrrp_.vrid, vrrp_.priority,
vrrp_.count_ip, vrrp_.auth_type, vrrp_.max_adver_int,
vrrp_.checksum)
offset += vrrpv2._MIN_LEN
struct.pack_into(ip_addresses_pack_str, buf, offset,
*map(lambda x: addrconv.ipv4.text_to_bin(x),
vrrp_.ip_addresses))
offset += ip_addresses_len
struct.pack_into(vrrpv2._AUTH_DATA_PACK_STR, buf, offset,
*vrrp_.auth_data)
if checksum:
vrrp_.checksum = packet_utils.checksum(buf)
struct.pack_into(vrrpv2._CHECKSUM_PACK_STR, buf,
vrrpv2._CHECKSUM_OFFSET, vrrp_.checksum)
return buf
开发者ID:AndreiaAB,项目名称:ryu,代码行数:35,代码来源:vrrp.py
示例7: icmpv6_csum
def icmpv6_csum(prev, buf):
ph = struct.pack('!16s16sI3xB', prev.src, prev.dst,
prev.payload_length, prev.nxt)
h = bytearray(buf)
struct.pack_into('!H', h, 2, 0)
return packet_utils.checksum(ph + h)
开发者ID:cardinal27513,项目名称:ryu,代码行数:7,代码来源:test_icmpv6.py
示例8: test_serialize
def test_serialize(self):
offset = 5
csum = 0
src_ip = '192.168.10.1'
dst_ip = '192.168.100.1'
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
inet.IPPROTO_TCP, 0, src_ip, dst_ip)
t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
offset, self.bits, self.window_size, csum, self.urgent)
buf = t.serialize(bytearray(), prev)
res = struct.unpack(tcp._PACK_STR, str(buf))
eq_(res[0], self.src_port)
eq_(res[1], self.dst_port)
eq_(res[2], self.seq)
eq_(res[3], self.ack)
eq_(res[4], offset << 4)
eq_(res[5], self.bits)
eq_(res[6], self.window_size)
eq_(res[8], self.urgent)
# checksum
ph = struct.pack('!4s4sBBH',
addrconv.ipv4.text_to_bin(src_ip),
addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
d = ph + buf + bytearray()
s = packet_utils.checksum(d)
eq_(0, s)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:30,代码来源:test_tcp.py
示例9: icmpv6_csum
def icmpv6_csum(prev, buf):
ph = struct.pack('!16s16sI3xB',
addrconv.ipv6.text_to_bin(prev.src),
addrconv.ipv6.text_to_bin(prev.dst),
prev.payload_length, prev.nxt)
h = bytearray(buf)
struct.pack_into('!H', h, 2, 0)
return packet_utils.checksum(ph + h)
开发者ID:dshulyak,项目名称:ryu,代码行数:9,代码来源:test_icmpv6.py
示例10: test_serialize
def test_serialize(self):
data = bytearray()
prev = None
buf = self.g.serialize(data, prev)
res = unpack_from(igmpv3_report._PACK_STR, buffer(buf))
eq_(res[0], self.msgtype)
eq_(res[1], checksum(self.buf))
eq_(res[2], self.record_num)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:10,代码来源:test_igmp.py
示例11: serialize
def serialize(self, payload, prev):
hdr = bytearray(struct.pack(self._PACK_STR, self.msgtype,
trunc(self.maxresp), self.csum,
addrconv.ipv4.text_to_bin(self.address)))
if self.csum == 0:
self.csum = packet_utils.checksum(hdr)
struct.pack_into('!H', hdr, 2, self.csum)
return hdr
开发者ID:5g-empower,项目名称:empower-ryu,代码行数:10,代码来源:igmp.py
示例12: test_default_args
def test_default_args(self):
prev = ipv4(proto=inet.IPPROTO_IGMP)
g = igmpv3_query()
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_query._PACK_STR, str(buf))
buf = bytearray(buf)
pack_into('!H', buf, 2, 0)
buf = str(buf)
eq_(res[0], IGMP_TYPE_QUERY)
eq_(res[1], 100)
eq_(res[2], checksum(buf))
eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
eq_(res[4], 2)
eq_(res[5], 0)
eq_(res[6], 0)
# srcs without num
prev = ipv4(proto=inet.IPPROTO_IGMP)
srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
g = igmpv3_query(srcs=srcs)
prev.serialize(g, None)
buf = g.serialize(bytearray(), prev)
res = unpack_from(igmpv3_query._PACK_STR, str(buf))
buf = bytearray(buf)
pack_into('!H', buf, 2, 0)
buf = str(buf)
eq_(res[0], IGMP_TYPE_QUERY)
eq_(res[1], 100)
eq_(res[2], checksum(buf))
eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
eq_(res[4], 2)
eq_(res[5], 0)
eq_(res[6], len(srcs))
res = unpack_from('4s4s4s', str(buf), igmpv3_query._MIN_LEN)
eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
开发者ID:AkiraSuu,项目名称:ryu,代码行数:42,代码来源:test_igmp.py
示例13: setUp
def setUp(self):
self.type_ = icmp.ICMP_ECHO_REQUEST
self.code = 0
self.csum = 0
self.data = None
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
self.csum_calc = packet_utils.checksum(self.buf)
struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:11,代码来源:test_icmp.py
示例14: serialize
def serialize(self):
tail = self.serialize_tail()
self.length = self._HDR_LEN + len(tail)
head = bytearray(struct.pack(self._HDR_PACK_STR, self.version,
self.type_, self.length,
addrconv.ipv4.text_to_bin(self.router_id),
addrconv.ipv4.text_to_bin(self.area_id), 0,
self.au_type, self.authentication))
buf = head + tail
csum = packet_utils.checksum(buf[:12]+buf[14:16]+buf[self._HDR_LEN:])
self.checksum = csum
struct.pack_into("!H", buf, 12, csum)
return buf
开发者ID:Annjana,项目名称:ryu,代码行数:13,代码来源:ospf.py
示例15: setUp_with_TimeExceeded
def setUp_with_TimeExceeded(self):
self.te_data = b"abc"
self.te_data_len = len(self.te_data)
self.data = icmp.TimeExceeded(data_len=self.te_data_len, data=self.te_data)
self.type_ = icmp.ICMP_TIME_EXCEEDED
self.code = 0
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
self.buf += self.data.serialize()
self.csum_calc = packet_utils.checksum(self.buf)
struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:13,代码来源:test_icmp.py
示例16: setUp_with_dest_unreach
def setUp_with_dest_unreach(self):
self.unreach_mtu = 10
self.unreach_data = b"abc"
self.unreach_data_len = len(self.unreach_data)
self.data = icmp.dest_unreach(data_len=self.unreach_data_len, mtu=self.unreach_mtu, data=self.unreach_data)
self.type_ = icmp.ICMP_DEST_UNREACH
self.code = icmp.ICMP_HOST_UNREACH_CODE
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
self.buf += self.data.serialize()
self.csum_calc = packet_utils.checksum(self.buf)
struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:14,代码来源:test_icmp.py
示例17: test_serialize
def test_serialize(self):
data = bytearray()
prev = None
buf = self.g.serialize(data, prev)
res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
eq_(res[0], self.msgtype)
eq_(res[1], self.maxresp)
eq_(res[2], checksum(self.buf))
eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
eq_(res[4], self.s_qrv)
eq_(res[5], self.qqic)
eq_(res[6], self.num)
开发者ID:StephenKing,项目名称:summerschool-2015-ryu,代码行数:14,代码来源:test_igmp.py
示例18: test_build_igmp
def test_build_igmp(self):
p = self._build_igmp()
e = self.find_protocol(p, "ethernet")
ok_(e)
eq_(e.ethertype, ether.ETH_TYPE_IP)
i = self.find_protocol(p, "ipv4")
ok_(i)
eq_(i.proto, inet.IPPROTO_IGMP)
g = self.find_protocol(p, "igmpv3_report")
ok_(g)
eq_(g.msgtype, self.msgtype)
eq_(g.csum, checksum(self.buf))
eq_(g.record_num, self.record_num)
eq_(g.records, self.records)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:18,代码来源:test_igmp.py
示例19: test_serialize_with_srcs
def test_serialize_with_srcs(self):
self.setUp_with_srcs()
data = bytearray()
prev = None
buf = self.g.serialize(data, prev)
res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
(src1, src2, src3) = unpack_from("4s4s4s", six.binary_type(buf), igmpv3_query._MIN_LEN)
eq_(res[0], self.msgtype)
eq_(res[1], self.maxresp)
eq_(res[2], checksum(self.buf))
eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
eq_(res[4], self.s_qrv)
eq_(res[5], self.qqic)
eq_(res[6], self.num)
eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
开发者ID:John-Lin,项目名称:ryu,代码行数:19,代码来源:test_igmp.py
示例20: test_serialize
def test_serialize(self):
buf = self.ip.serialize(bytearray(), None)
res = struct.unpack_from(ipv4._PACK_STR, six.binary_type(buf))
option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)]
eq_(res[0], self.ver_hlen)
eq_(res[1], self.tos)
eq_(res[2], self.total_length)
eq_(res[3], self.identification)
eq_(res[4], self.flg_off)
eq_(res[5], self.ttl)
eq_(res[6], self.proto)
eq_(res[8], addrconv.ipv4.text_to_bin(self.src))
eq_(res[9], addrconv.ipv4.text_to_bin(self.dst))
eq_(option, self.option)
# checksum
csum = packet_utils.checksum(buf)
eq_(csum, 0)
开发者ID:5g-empower,项目名称:empower-ryu,代码行数:19,代码来源:test_ipv4.py
注:本文中的ryu.lib.packet.packet_utils.checksum函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论