• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python packet_utils.checksum函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ryu.lib.packet.packet_utils.checksum函数的典型用法代码示例。如果您正苦于以下问题:Python checksum函数的具体用法?Python checksum怎么用?Python checksum使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了checksum函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_default_args

    def test_default_args(self):
        prev = ipv4(proto=inet.IPPROTO_IGMP)
        g = igmpv3_report()
        prev.serialize(g, None)
        buf = g.serialize(bytearray(), prev)
        res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
        buf = bytearray(buf)
        pack_into("!H", buf, 2, 0)

        eq_(res[0], IGMP_TYPE_REPORT_V3)
        eq_(res[1], checksum(buf))
        eq_(res[2], 0)

        # records without record_num
        prev = ipv4(proto=inet.IPPROTO_IGMP)
        record1 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 0, "225.0.0.1")
        record2 = igmpv3_report_group(MODE_IS_INCLUDE, 0, 2, "225.0.0.2", ["172.16.10.10", "172.16.10.27"])
        record3 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 0, "225.0.0.3", [], b"abc\x00")
        record4 = igmpv3_report_group(MODE_IS_INCLUDE, 1, 2, "225.0.0.4", ["172.16.10.10", "172.16.10.27"], b"abc\x00")
        records = [record1, record2, record3, record4]
        g = igmpv3_report(records=records)
        prev.serialize(g, None)
        buf = g.serialize(bytearray(), prev)
        res = unpack_from(igmpv3_report._PACK_STR, six.binary_type(buf))
        buf = bytearray(buf)
        pack_into("!H", buf, 2, 0)

        eq_(res[0], IGMP_TYPE_REPORT_V3)
        eq_(res[1], checksum(buf))
        eq_(res[2], len(records))
开发者ID:John-Lin,项目名称:ryu,代码行数:30,代码来源:test_igmp.py


示例2: test_serialize

    def test_serialize(self):
        src_ip = netaddr.IPAddress('192.168.0.1').value
        dst_ip = vrrp.VRRP_IPV4_DST_ADDRESS
        prev = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, vrrp.VRRP_IPV4_TTL,
                         inet.IPPROTO_VRRP, 0, src_ip, dst_ip)

        type_ = vrrp.VRRP_TYPE_ADVERTISEMENT
        vrid = 5
        priority = 10
        max_adver_int = 30
        ip_address = netaddr.IPAddress('192.168.0.2').value
        ip_addresses = [ip_address]

        vrrp_ = vrrp.vrrpv2.create(
            type_, vrid, priority, max_adver_int, ip_addresses)

        buf = vrrp_.serialize(bytearray(), prev)
        pack_str = vrrp.vrrpv2._PACK_STR + 'III'
        pack_len = struct.calcsize(pack_str)
        res = struct.unpack(pack_str, str(buf))
        eq_(res[0], vrrp.vrrp_to_version_type(vrrp.VRRP_VERSION_V2, type_))
        eq_(res[1], vrid)
        eq_(res[2], priority)
        eq_(res[3], len(ip_addresses))
        eq_(res[4], vrrp.VRRP_AUTH_NO_AUTH)
        eq_(res[5], max_adver_int)
        # res[6] is checksum
        eq_(res[7], ip_address)
        eq_(res[8], 0)
        eq_(res[9], 0)
        eq_(len(buf), pack_len)

        # checksum
        s = packet_utils.checksum(buf)
        eq_(0, s)
开发者ID:isomer,项目名称:ryu,代码行数:35,代码来源:test_vrrp.py


示例3: test_serialize

    def test_serialize(self):
        src_port = 6431
        dst_port = 8080
        total_length = 0
        csum = 0

        src_ip = '192.168.10.1'
        dst_ip = '192.168.100.1'
        prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
                    inet.IPPROTO_UDP, 0, src_ip, dst_ip)

        u = udp(src_port, dst_port, total_length, csum)
        buf = u.serialize(bytearray(), prev)
        res = struct.unpack(udp._PACK_STR, buf)

        eq_(res[0], src_port)
        eq_(res[1], dst_port)
        eq_(res[2], struct.calcsize(udp._PACK_STR))

        # checksum
        ph = struct.pack('!4s4sBBH',
                         addrconv.ipv4.text_to_bin(src_ip),
                         addrconv.ipv4.text_to_bin(dst_ip), 0, 17, res[2])
        d = ph + buf + bytearray()
        s = packet_utils.checksum(d)
        eq_(0, s)
开发者ID:Aminiok,项目名称:ryu,代码行数:26,代码来源:test_udp.py


示例4: icmpv6_csum

def icmpv6_csum(prev, buf):
    ph = struct.pack('!16s16sBBH', prev.src, prev.dst, 0, prev.nxt,
                     prev.payload_length)
    h = bytearray(buf)
    struct.pack_into('!H', h, 2, 0)

    return socket.htons(packet_utils.checksum(ph + h))
开发者ID:09zwcbupt,项目名称:ryu,代码行数:7,代码来源:test_icmpv6.py


示例5: _parser

    def _parser(cls, buf):
        if len(buf) < cls._HDR_LEN:
            raise stream_parser.StreamParser.TooSmallException(
                '%d < %d' % (len(buf), cls._HDR_LEN))
        (version, type_, length, router_id, area_id, checksum, au_type,
         authentication) = struct.unpack_from(cls._HDR_PACK_STR,
                                              six.binary_type(buf))

        # Exclude checksum and authentication field for checksum validation.
        if packet_utils.checksum(buf[:12] + buf[14:16] + buf[cls._HDR_LEN:]) \
                != checksum:
            raise InvalidChecksum

        if len(buf) < length:
            raise stream_parser.StreamParser.TooSmallException(
                '%d < %d' % (len(buf), length))

        router_id = addrconv.ipv4.bin_to_text(router_id)
        area_id = addrconv.ipv4.bin_to_text(area_id)
        binmsg = buf[cls._HDR_LEN:length]
        rest = buf[length:]
        subcls = cls._lookup_type(type_)
        kwargs = subcls.parser(binmsg)
        return subcls(length, router_id, area_id, au_type, int(authentication),
                      checksum, version, **kwargs), None, rest
开发者ID:Aries-Sushi,项目名称:ryu,代码行数:25,代码来源:ospf.py


示例6: serialize_static

    def serialize_static(vrrp_, prev):
        assert not vrrp_.is_ipv6        # vrrpv2 defines only IPv4
        ip_addresses_pack_str = vrrpv2._ip_addresses_pack_str(vrrp_.count_ip)
        ip_addresses_len = struct.calcsize(ip_addresses_pack_str)
        vrrp_len = vrrpv2._MIN_LEN + ip_addresses_len + vrrpv2._AUTH_DATA_LEN

        checksum = False
        if vrrp_.checksum is None:
            checksum = True
            vrrp_.checksum = 0

        if vrrp_.auth_type is None:
            vrrp_.auth_type = VRRP_AUTH_NO_AUTH
        if vrrp_.auth_data is None:
            vrrp_.auth_data = VRRP_AUTH_DATA

        buf = bytearray(vrrp_len)
        offset = 0
        struct.pack_into(vrrpv2._PACK_STR, buf, offset,
                         vrrp_to_version_type(vrrp_.version, vrrp_.type),
                         vrrp_.vrid, vrrp_.priority,
                         vrrp_.count_ip, vrrp_.auth_type, vrrp_.max_adver_int,
                         vrrp_.checksum)
        offset += vrrpv2._MIN_LEN
        struct.pack_into(ip_addresses_pack_str, buf, offset,
                         *map(lambda x: addrconv.ipv4.text_to_bin(x),
                              vrrp_.ip_addresses))
        offset += ip_addresses_len
        struct.pack_into(vrrpv2._AUTH_DATA_PACK_STR, buf, offset,
                         *vrrp_.auth_data)
        if checksum:
            vrrp_.checksum = packet_utils.checksum(buf)
            struct.pack_into(vrrpv2._CHECKSUM_PACK_STR, buf,
                             vrrpv2._CHECKSUM_OFFSET, vrrp_.checksum)
        return buf
开发者ID:AndreiaAB,项目名称:ryu,代码行数:35,代码来源:vrrp.py


示例7: icmpv6_csum

def icmpv6_csum(prev, buf):
    ph = struct.pack('!16s16sI3xB', prev.src, prev.dst,
                     prev.payload_length, prev.nxt)
    h = bytearray(buf)
    struct.pack_into('!H', h, 2, 0)

    return packet_utils.checksum(ph + h)
开发者ID:cardinal27513,项目名称:ryu,代码行数:7,代码来源:test_icmpv6.py


示例8: test_serialize

    def test_serialize(self):
        offset = 5
        csum = 0

        src_ip = '192.168.10.1'
        dst_ip = '192.168.100.1'
        prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
                    inet.IPPROTO_TCP, 0, src_ip, dst_ip)

        t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
                offset, self.bits, self.window_size, csum, self.urgent)
        buf = t.serialize(bytearray(), prev)
        res = struct.unpack(tcp._PACK_STR, str(buf))

        eq_(res[0], self.src_port)
        eq_(res[1], self.dst_port)
        eq_(res[2], self.seq)
        eq_(res[3], self.ack)
        eq_(res[4], offset << 4)
        eq_(res[5], self.bits)
        eq_(res[6], self.window_size)
        eq_(res[8], self.urgent)

        # checksum
        ph = struct.pack('!4s4sBBH',
                         addrconv.ipv4.text_to_bin(src_ip),
                         addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
        d = ph + buf + bytearray()
        s = packet_utils.checksum(d)
        eq_(0, s)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:30,代码来源:test_tcp.py


示例9: icmpv6_csum

def icmpv6_csum(prev, buf):
    ph = struct.pack('!16s16sI3xB',
                     addrconv.ipv6.text_to_bin(prev.src),
                     addrconv.ipv6.text_to_bin(prev.dst),
                     prev.payload_length, prev.nxt)
    h = bytearray(buf)
    struct.pack_into('!H', h, 2, 0)

    return packet_utils.checksum(ph + h)
开发者ID:dshulyak,项目名称:ryu,代码行数:9,代码来源:test_icmpv6.py


示例10: test_serialize

    def test_serialize(self):
        data = bytearray()
        prev = None
        buf = self.g.serialize(data, prev)

        res = unpack_from(igmpv3_report._PACK_STR, buffer(buf))

        eq_(res[0], self.msgtype)
        eq_(res[1], checksum(self.buf))
        eq_(res[2], self.record_num)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:10,代码来源:test_igmp.py


示例11: serialize

    def serialize(self, payload, prev):
        hdr = bytearray(struct.pack(self._PACK_STR, self.msgtype,
                                    trunc(self.maxresp), self.csum,
                                    addrconv.ipv4.text_to_bin(self.address)))

        if self.csum == 0:
            self.csum = packet_utils.checksum(hdr)
            struct.pack_into('!H', hdr, 2, self.csum)

        return hdr
开发者ID:5g-empower,项目名称:empower-ryu,代码行数:10,代码来源:igmp.py


示例12: test_default_args

    def test_default_args(self):
        prev = ipv4(proto=inet.IPPROTO_IGMP)
        g = igmpv3_query()
        prev.serialize(g, None)
        buf = g.serialize(bytearray(), prev)
        res = unpack_from(igmpv3_query._PACK_STR, str(buf))
        buf = bytearray(buf)
        pack_into('!H', buf, 2, 0)
        buf = str(buf)

        eq_(res[0], IGMP_TYPE_QUERY)
        eq_(res[1], 100)
        eq_(res[2], checksum(buf))
        eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
        eq_(res[4], 2)
        eq_(res[5], 0)
        eq_(res[6], 0)

        # srcs without num
        prev = ipv4(proto=inet.IPPROTO_IGMP)
        srcs = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
        g = igmpv3_query(srcs=srcs)
        prev.serialize(g, None)
        buf = g.serialize(bytearray(), prev)
        res = unpack_from(igmpv3_query._PACK_STR, str(buf))
        buf = bytearray(buf)
        pack_into('!H', buf, 2, 0)
        buf = str(buf)

        eq_(res[0], IGMP_TYPE_QUERY)
        eq_(res[1], 100)
        eq_(res[2], checksum(buf))
        eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
        eq_(res[4], 2)
        eq_(res[5], 0)
        eq_(res[6], len(srcs))

        res = unpack_from('4s4s4s', str(buf), igmpv3_query._MIN_LEN)

        eq_(res[0], addrconv.ipv4.text_to_bin(srcs[0]))
        eq_(res[1], addrconv.ipv4.text_to_bin(srcs[1]))
        eq_(res[2], addrconv.ipv4.text_to_bin(srcs[2]))
开发者ID:AkiraSuu,项目名称:ryu,代码行数:42,代码来源:test_igmp.py


示例13: setUp

    def setUp(self):
        self.type_ = icmp.ICMP_ECHO_REQUEST
        self.code = 0
        self.csum = 0
        self.data = None

        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:11,代码来源:test_icmp.py


示例14: serialize

 def serialize(self):
     tail = self.serialize_tail()
     self.length = self._HDR_LEN + len(tail)
     head = bytearray(struct.pack(self._HDR_PACK_STR, self.version,
                      self.type_, self.length,
                      addrconv.ipv4.text_to_bin(self.router_id),
                      addrconv.ipv4.text_to_bin(self.area_id), 0,
                      self.au_type, self.authentication))
     buf = head + tail
     csum = packet_utils.checksum(buf[:12]+buf[14:16]+buf[self._HDR_LEN:])
     self.checksum = csum
     struct.pack_into("!H", buf, 12, csum)
     return buf
开发者ID:Annjana,项目名称:ryu,代码行数:13,代码来源:ospf.py


示例15: setUp_with_TimeExceeded

    def setUp_with_TimeExceeded(self):
        self.te_data = b"abc"
        self.te_data_len = len(self.te_data)
        self.data = icmp.TimeExceeded(data_len=self.te_data_len, data=self.te_data)

        self.type_ = icmp.ICMP_TIME_EXCEEDED
        self.code = 0
        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.buf += self.data.serialize()
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:13,代码来源:test_icmp.py


示例16: setUp_with_dest_unreach

    def setUp_with_dest_unreach(self):
        self.unreach_mtu = 10
        self.unreach_data = b"abc"
        self.unreach_data_len = len(self.unreach_data)
        self.data = icmp.dest_unreach(data_len=self.unreach_data_len, mtu=self.unreach_mtu, data=self.unreach_data)

        self.type_ = icmp.ICMP_DEST_UNREACH
        self.code = icmp.ICMP_HOST_UNREACH_CODE
        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.buf += self.data.serialize()
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into("!H", self.buf, 2, self.csum_calc)
开发者ID:John-Lin,项目名称:ryu,代码行数:14,代码来源:test_icmp.py


示例17: test_serialize

    def test_serialize(self):
        data = bytearray()
        prev = None
        buf = self.g.serialize(data, prev)

        res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))

        eq_(res[0], self.msgtype)
        eq_(res[1], self.maxresp)
        eq_(res[2], checksum(self.buf))
        eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
        eq_(res[4], self.s_qrv)
        eq_(res[5], self.qqic)
        eq_(res[6], self.num)
开发者ID:StephenKing,项目名称:summerschool-2015-ryu,代码行数:14,代码来源:test_igmp.py


示例18: test_build_igmp

    def test_build_igmp(self):
        p = self._build_igmp()

        e = self.find_protocol(p, "ethernet")
        ok_(e)
        eq_(e.ethertype, ether.ETH_TYPE_IP)

        i = self.find_protocol(p, "ipv4")
        ok_(i)
        eq_(i.proto, inet.IPPROTO_IGMP)

        g = self.find_protocol(p, "igmpv3_report")
        ok_(g)

        eq_(g.msgtype, self.msgtype)
        eq_(g.csum, checksum(self.buf))
        eq_(g.record_num, self.record_num)
        eq_(g.records, self.records)
开发者ID:AkiraSuu,项目名称:ryu,代码行数:18,代码来源:test_igmp.py


示例19: test_serialize_with_srcs

    def test_serialize_with_srcs(self):
        self.setUp_with_srcs()
        data = bytearray()
        prev = None
        buf = self.g.serialize(data, prev)

        res = unpack_from(igmpv3_query._PACK_STR, six.binary_type(buf))
        (src1, src2, src3) = unpack_from("4s4s4s", six.binary_type(buf), igmpv3_query._MIN_LEN)

        eq_(res[0], self.msgtype)
        eq_(res[1], self.maxresp)
        eq_(res[2], checksum(self.buf))
        eq_(res[3], addrconv.ipv4.text_to_bin(self.address))
        eq_(res[4], self.s_qrv)
        eq_(res[5], self.qqic)
        eq_(res[6], self.num)
        eq_(src1, addrconv.ipv4.text_to_bin(self.srcs[0]))
        eq_(src2, addrconv.ipv4.text_to_bin(self.srcs[1]))
        eq_(src3, addrconv.ipv4.text_to_bin(self.srcs[2]))
开发者ID:John-Lin,项目名称:ryu,代码行数:19,代码来源:test_igmp.py


示例20: test_serialize

    def test_serialize(self):
        buf = self.ip.serialize(bytearray(), None)
        res = struct.unpack_from(ipv4._PACK_STR, six.binary_type(buf))
        option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)]

        eq_(res[0], self.ver_hlen)
        eq_(res[1], self.tos)
        eq_(res[2], self.total_length)
        eq_(res[3], self.identification)
        eq_(res[4], self.flg_off)
        eq_(res[5], self.ttl)
        eq_(res[6], self.proto)
        eq_(res[8], addrconv.ipv4.text_to_bin(self.src))
        eq_(res[9], addrconv.ipv4.text_to_bin(self.dst))
        eq_(option, self.option)

        # checksum
        csum = packet_utils.checksum(buf)
        eq_(csum, 0)
开发者ID:5g-empower,项目名称:empower-ryu,代码行数:19,代码来源:test_ipv4.py



注:本文中的ryu.lib.packet.packet_utils.checksum函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python slow.lacp函数代码示例发布时间:2022-05-27
下一篇:
Python packet.Packet类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap