• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python struct.pack_into函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中struct.pack_into函数的典型用法代码示例。如果您正苦于以下问题:Python pack_into函数的具体用法?Python pack_into怎么用?Python pack_into使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了pack_into函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _boot_pkt

def _boot_pkt(socket, host, op, a1, a2, a3, data=None, offset=0, datasize=0):
    """
    Packs the given data into the format required by the Boot ROM program that
    executes automatically when SpiNNaker is first turned on.

    :param socket: stream to write the command to
    :param host: hostname of the target SpiNNaker
    :param op:   boot ROM command
    :param arg1: argument 1 -- varies with ``op``
    :param arg2: argument 2 -- varies with ``op``
    :param arg3: argument 3 -- varies with ``op``
    :param data: optional data
    :param offset: the offset into the data to start from
    :param datasize: the maximum amount of data to write from the data

    """

    if data is not None:
        pkt_data = numpy.zeros(datasize + 18, dtype=numpy.uint8)
        struct.pack_into(">HLLLL", pkt_data, 0, BOOT_PROT_VER, op, a1, a2, a3)
        off = 0
        readsize = datasize
        if (offset + readsize) > data.size:
            readsize = data.size - offset
        while off < readsize:
            the_word = struct.unpack_from("<I", data, offset + off)[0]
            struct.pack_into(">I", pkt_data, 18 + off, the_word)
            off += 4
        socket.sendto(pkt_data, (host, BOOT_PORT))
    else:
        hdr = struct.pack(">HLLLL", BOOT_PROT_VER, op, a1, a2, a3)
        socket.sendto(hdr, (host, BOOT_PORT))
    time.sleep(BOOT_DELAY)
开发者ID:jougs,项目名称:SpiNNakerManchester.github.io,代码行数:33,代码来源:boot.py


示例2: SerializeArray

  def SerializeArray(self, value, data_offset, data, handle_offset):
    size = (serialization.HEADER_STRUCT.size +
            self.sub_type.GetByteSize() * len(value))
    data_end = len(data)
    position = len(data) + serialization.HEADER_STRUCT.size
    data.extend(bytearray(size +
                          serialization.NeededPaddingForAlignment(size)))
    returned_handles = []
    to_pack = []
    for item in value:
      (new_data, new_handles) = self.sub_type.Serialize(
          item,
          len(data) - position,
          data,
          handle_offset + len(returned_handles))
      to_pack.extend(serialization.Flatten(new_data))
      returned_handles.extend(new_handles)
      position = position + self.sub_type.GetByteSize()

    serialization.HEADER_STRUCT.pack_into(data, data_end, size, len(value))
    # TODO(azani): Refactor so we don't have to create big formatting strings.
    struct.pack_into(('%s' % self.sub_type.GetTypeCode()) * len(value),
                     data,
                     data_end + serialization.HEADER_STRUCT.size,
                     *to_pack)
    return (data_offset, returned_handles)
开发者ID:chinmaygarde,项目名称:mojom-standalone,代码行数:26,代码来源:descriptor.py


示例3: sendJoinRoomRequestOIE

 def sendJoinRoomRequestOIE(self,roomName):
     self.RoomName=roomName      # il y'avait self.thisRoomName???
     ack_num=0
     res=0
     ack=0
     if (roomName!=ROOM_IDS.MAIN_ROOM):
         self.userStatus=userStatus['waitingfMovieRoomUserList']
         self.typemsgenvoye="6"
         typeMsg=6
         msgLength=14
         Movie=self.MovieStore.getMovieByTitle(roomName)
         print Movie.movieTitle
         IpAdressMovie=Movie.movieIpAddress
         print IpAdressMovie
         MoviePort=Movie.moviePort
         buf=ctypes.create_string_buffer(14)
         header=(typeMsg<<28)|(ack<<27)|(res<<26)|(ack_num<<13)|(self.seq_num)
         Tab=IpAdressMovie.split('.')
         print Tab
         struct.pack_into(">LHHBBBBH", buf,0,header,msgLength,self.userId,int(Tab[0]),int(Tab[1]),int(Tab[2]),int(Tab[3]),MoviePort)
         self.EnvoiMsg(buf)
     else:
         typeMsg=7
         self.userStatus=userStatus['waitingMainRoom']
         self.typemsgenvoye="7"
         msgLength=8
         buf=ctypes.create_string_buffer(8)
         header=(typeMsg<<28)|(ack<<27)|(res<<26)|(ack_num<<13)|(self.seq_num)
         struct.pack_into(">LHH", buf,0,header,msgLength,self.userId)
         self.EnvoiMsg(buf)
     print "Request join movie room envoyé"
开发者ID:delco225,项目名称:C2W,代码行数:31,代码来源:tcp_chat_client.py


示例4: pack_into

    def pack_into(self, buff, offset):
        """Serialize and write to ``buff`` starting at offset ``offset``.

        Intentionally follows the pattern of ``struct.pack_into``

        :param buff: The buffer to write into
        :param offset: The offset to start the write at
        """
        if self.partition_key is None:
            fmt = "!BBii%ds" % len(self.value)
            args = (self.MAGIC, self.compression_type, -1, len(self.value), self.value)
        else:
            fmt = "!BBi%dsi%ds" % (len(self.partition_key), len(self.value))
            args = (
                self.MAGIC,
                self.compression_type,
                len(self.partition_key),
                self.partition_key,
                len(self.value),
                self.value,
            )
        struct.pack_into(fmt, buff, offset + 4, *args)
        fmt_size = struct.calcsize(fmt)
        data = buffer(buff[(offset + 4) : (offset + 4 + fmt_size)])
        crc = crc32(data) & 0xFFFFFFFF
        struct.pack_into("!I", buff, offset, crc)
开发者ID:asifhj,项目名称:pykafka,代码行数:26,代码来源:protocol.py


示例5: updateDossierCompDescr

    def updateDossierCompDescr(self, dossierCompDescrArray, offset, size):
        length = size / self.__itemSize
        newLength = len(self.__list)
        newSize = newLength * self.__itemSize
        if self.__isExpanded:
            fmt = '<' + self.__itemFormat * newLength
            values = []
            for item in self.__list:
                values += self.__itemToList(item)

            if newSize == size:
                struct.pack_into(fmt, dossierCompDescrArray, offset, *values)
                return (dossierCompDescrArray, newSize)
            return (dossierCompDescrArray[:offset] + array('c', struct.pack(fmt, *values)) + dossierCompDescrArray[offset + size:], newSize)
        for idx in self.__changed:
            if idx < length:
                itemOffset = offset + idx * self.__itemSize
                struct.pack_into(('<' + self.__itemFormat), dossierCompDescrArray, itemOffset, *self.__itemToList(self.__list[idx]))

        self.__changed.clear()
        added = self.__list[length:]
        if added:
            values = []
            for item in added:
                values += self.__itemToList(item)

            fmt = '<' + self.__itemFormat * len(added)
            dossierCompDescrArray = dossierCompDescrArray[:offset + size] + array('c', struct.pack(fmt, *values)) + dossierCompDescrArray[offset + size:]
        return (dossierCompDescrArray, newSize)
开发者ID:aevitas,项目名称:wotsdk,代码行数:29,代码来源:commondossierblocks.py


示例6: test_checksum

    def test_checksum(self):
        """
        This tests the checksum algorithm; if somebody changes the algorithm
        this test should catch it.  Had to jump through some hoops to do this;
        needed to add set_data_length and set_header because we're building our
        own header here (the one in PortAgentPacket includes the timestamp
        so the checksum is not consistent).
        """
        test_data = "This tests the checksum algorithm."
        test_length = len(test_data)
        self.pap.attach_data(test_data)

        # Now build a header
        variable_tuple = (0xa3, 0x9d, 0x7a, self.pap.DATA_FROM_DRIVER,
                          test_length + HEADER_SIZE, 0x0000,
                          0)
        self.pap.set_data_length(test_length)

        header_format = '>BBBBHHd'
        size = struct.calcsize(header_format)
        temp_header = ctypes.create_string_buffer(size)
        struct.pack_into(header_format, temp_header, 0, *variable_tuple)

        # Now set the header member in PortAgentPacket to the header
        # we built
        self.pap.set_header(temp_header.raw)

        # Now get the checksum and verify it is what we expect it to be.
        checksum = self.pap.calculate_checksum()
        self.assertEqual(checksum, 2)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:30,代码来源:test_port_agent_client.py


示例7: _pack_packet

def _pack_packet(hostname, service, state, output, timestamp):
    """This is more complicated than a call to struct.pack() because we want
    to pad our strings with random bytes, instead of with zeros."""
    requested_length = struct.calcsize(_data_packet_format)
    packet = array.array('c', '\0'*requested_length)
    # first, pack the version, initial crc32, timestamp, and state
    # (collectively:header)
    header_format = '!hxxLLh'
    offset = struct.calcsize(header_format)
    struct.pack_into('!hxxLLh', packet, 0, PACKET_VERSION, 0, timestamp, state)
    # next, pad & pack the hostname
    hostname = hostname + '\0'
    if len(hostname) < MAX_HOSTNAME_LENGTH:
        hostname += get_random_alphanumeric_bytes(MAX_HOSTNAME_LENGTH - len(hostname))
    struct.pack_into('!%ds' % (MAX_HOSTNAME_LENGTH,), packet, offset, hostname)
    offset += struct.calcsize('!%ds' % (MAX_HOSTNAME_LENGTH,))
    # next, pad & pack the service description
    service = service + '\0'
    if len(service) < MAX_DESCRIPTION_LENGTH:
        service += get_random_alphanumeric_bytes(MAX_DESCRIPTION_LENGTH - len(service))
    struct.pack_into('%ds' % (MAX_DESCRIPTION_LENGTH,), packet, offset, service)
    offset += struct.calcsize('!%ds' % (MAX_DESCRIPTION_LENGTH))
    # finally, pad & pack the plugin output
    output = output + '\0'
    if len(output) < MAX_PLUGINOUTPUT_LENGTH:
        output += get_random_alphanumeric_bytes(MAX_PLUGINOUTPUT_LENGTH - len(output))
    struct.pack_into('%ds' % (MAX_PLUGINOUTPUT_LENGTH,), packet, offset, output)
    # compute the CRC32 of what we have so far
    crc_val = binascii.crc32(packet) & 0xffffffffL
    struct.pack_into('!L', packet, 4, crc_val)
    return packet.tostring()
开发者ID:etscrivner,项目名称:send_nsca,代码行数:31,代码来源:nsca.py


示例8: comms_on

def comms_on():
    sock = Udpsocket([])
    data = ctypes.create_string_buffer(13)
    data[0:9] = 'RF SW CMD'
    struct.pack_into("<I", data, 9, settings.RF_SW_CMD_ON_INT)
    d = bytearray(data)
    sock.sendto(d, (packet_settings.FRAME_RECEIVER_IP, packet_settings.FRAME_RECEIVER_PORT))
开发者ID:cshields,项目名称:satnogs-client,代码行数:7,代码来源:packet.py


示例9: pdpte_create_binary_file

    def pdpte_create_binary_file(self):
        # pae needs a pdpte at 32byte aligned address

        # Even though we have only 4 entries in the pdpte we need to move
        # the self.output_offset variable to the next page to start pushing
        # the pd contents
        #
        # FIXME: This wastes a ton of RAM!!
        if args.verbose:
            print("PDPTE at 0x%x" % self.pd_start_addr)

        for pdpte in range(self.total_pages + 1):
            if pdpte in self.get_pdpte_list():
                present = 1 << 0
                addr_of_pd = (((self.pd_start_addr + 4096) +
                               self.get_pdpte_list().index(pdpte) *
                               4096) >> 12) << 12
                binary_value = (present | addr_of_pd)
            else:
                binary_value = 0

            struct.pack_into(page_entry_format,
                             self.output_buffer,
                             self.output_offset,
                             binary_value)

            self.output_offset += struct.calcsize(page_entry_format)
开发者ID:loicpoulain,项目名称:zephyr,代码行数:27,代码来源:gen_mmu_x86.py


示例10: scan_rays

def scan_rays(rays, max_distance, ray_origins=False, keep_render_setup=False, do_shading=True):

    elementsPerRay = 3
    if ray_origins == True:
      elementsPerRay = 6
  
    numberOfRays = int(len(rays)/elementsPerRay)


    rays_buffer = (ctypes.c_float * numberOfRays*elementsPerRay)()
    struct.pack_into("%df"%(numberOfRays*elementsPerRay), rays_buffer, 0, *rays[:numberOfRays*elementsPerRay])

    returns_buffer = (ctypes.c_float * (numberOfRays * ELEMENTS_PER_RETURN))()
   
    print ("Raycount: ", numberOfRays)
    
    returns_buffer_uint = ctypes.cast(returns_buffer, ctypes.POINTER(ctypes.c_uint))

    array_of_returns = []

    try:
      bpy.ops.render.blensor(raycount = numberOfRays,maximum_distance = max_distance, vector_strptr="%016X"%(ctypes.addressof(rays_buffer)), return_vector_strptr="%016X"%(ctypes.addressof(returns_buffer)), elements_per_ray = elementsPerRay, keep_render_setup=keep_render_setup,
      shading = do_shading)
      


      for idx in range(numberOfRays):
          if returns_buffer[idx*ELEMENTS_PER_RETURN] < max_distance and returns_buffer[idx*ELEMENTS_PER_RETURN]>0.0 :
              #The ray may have been reflecten and refracted. But the laser
              #does not know that so we need to calculate the point which
              #is the measured distance away from the sensor but without
              #beeing reflected/refracted. We use the original ray direction
              vec = [float(rays[idx*elementsPerRay]), 
                     float(rays[idx*elementsPerRay+1]),
                     float(rays[idx*elementsPerRay+2])]
              veclen = math.sqrt(vec[0]**2+vec[1]**2+vec[2]**2)
              raydistance = float(returns_buffer[idx*ELEMENTS_PER_RETURN])
              vec[0] = raydistance * vec[0]/veclen
              vec[1] = raydistance * vec[1]/veclen
              vec[2] = raydistance * vec[2]/veclen


              ret = [ float(returns_buffer[e + idx*ELEMENTS_PER_RETURN]) for e in range(4) ]            
              ret[1] = vec[0]
              ret[2] = vec[1]
              ret[3] = vec[2]
              ret.append(returns_buffer_uint[idx*ELEMENTS_PER_RETURN+4]) #objectid
              ret.append((returns_buffer[idx*ELEMENTS_PER_RETURN+5],
                          returns_buffer[idx*ELEMENTS_PER_RETURN+6],
                          returns_buffer[idx*ELEMENTS_PER_RETURN+7])) # RGB Value of the material
              ret.append(idx) # Store the index per return as the last element
              array_of_returns.append(ret)
    except TypeError as e:
      exc_type, exc_value, exc_traceback = sys.exc_info()
      traceback.print_tb(exc_traceback)
    finally:
      del rays_buffer
      del returns_buffer

    return array_of_returns
开发者ID:nttputus,项目名称:blensor,代码行数:60,代码来源:scan_interface.py


示例11: initial_connection

    async def initial_connection(self, data):
        state = self._connection
        state.ssrc = data['ssrc']
        state.voice_port = data['port']

        packet = bytearray(70)
        struct.pack_into('>I', packet, 0, state.ssrc)
        state.socket.sendto(packet, (state.endpoint_ip, state.voice_port))
        recv = await self.loop.sock_recv(state.socket, 70)
        log.debug('received packet in initial_connection: %s', recv)

        # the ip is ascii starting at the 4th byte and ending at the first null
        ip_start = 4
        ip_end = recv.index(0, ip_start)
        state.ip = recv[ip_start:ip_end].decode('ascii')

        # the port is a little endian unsigned short in the last two bytes
        # yes, this is different endianness from everything else
        state.port = struct.unpack_from('<H', recv, len(recv) - 2)[0]
        log.debug('detected ip: %s port: %s', state.ip, state.port)

        # there *should* always be at least one supported mode (xsalsa20_poly1305)
        modes = [mode for mode in data['modes'] if mode in self._connection.supported_modes]
        log.debug('received supported encryption modes: %s', ", ".join(modes))

        mode = modes[0]
        await self.select_protocol(state.ip, state.port, mode)
        log.info('selected the voice protocol for use (%s)', mode)

        await self.client_connect()
开发者ID:imayhaveborkedit,项目名称:discord.py,代码行数:30,代码来源:gateway.py


示例12: throttle

 def throttle(self,speed,direction):
   '''
  This method controls the train's throttle.
  The two parameters are:
  speed 0 - 127  where 0 = stop
  direction 0 - FORWARD
            1 - REVERSE
  For imformation about the message sent by this method refer to the XpressNet protocol:
    'Locomotive speed and direction operations'
  example:
  t1.throttle(15,FORWARD)  # move train forward with a speed of 15 steps
  t1.throttle(0,FORWARD)   # stop train      
   '''
   message = bytearray('E400000000'.decode('hex'))
   message[1] = 0x13   #128 speed steps
   struct.pack_into(">H",message,2,self.address)
   # NB If speed is set to max, the command has to be done differently
   # else the elink bombs, so it is just easier to block it.
   if speed > 120:
     speed = 120
   message[4] = speed
   if direction == FORWARD : message[4] |= 0x80
   elif direction == REVERSE : message[4] &= 0x7F
   parity(message)
   send (message)
开发者ID:Cavey,项目名称:py-rail,代码行数:25,代码来源:hornby.py


示例13: icmpv6_csum

def icmpv6_csum(prev, buf):
    ph = struct.pack('!16s16sBBH', prev.src, prev.dst, 0, prev.nxt,
                     prev.payload_length)
    h = bytearray(buf)
    struct.pack_into('!H', h, 2, 0)

    return socket.htons(packet_utils.checksum(ph + h))
开发者ID:09zwcbupt,项目名称:ryu,代码行数:7,代码来源:test_icmpv6.py


示例14: set

    def set(self, value):
        """
        If a primitive, assigns data.

        Complex objects should raise a ValueError.
        """
        struct.pack_into(self.pack, self.buf, self.offset, value)
开发者ID:norbert-jiang,项目名称:tinyos-2.x-contrib,代码行数:7,代码来源:probe.py


示例15: test3

def test3():
    class X(object):
        def __init__(self, a,b):
            self.a, self.b = a,b  # ubyte, float
        def __eq__(self, other):
            return  self.a == other.a  and  abs(self.b - other.b) < 1e-6

    x = [ X(1, .1), X(2, .2) ]
    fmt = "=Bf"

    x_bytes = struct.calcsize(fmt)
    assert( x_bytes == 5 )

    buf = bytearray( len(x) * x_bytes )

    for i,n in enumerate(x):
        struct.pack_into( fmt, buf, i*x_bytes,  n.a, n.b )

    back = []
    count = int( len(buf) / x_bytes )
    for i in range(count):
        ab = struct.unpack_from( fmt, buf, i*x_bytes )
        back += [ X(*ab) ]

    assert( back == x )
开发者ID:tiggerntatie,项目名称:brython,代码行数:25,代码来源:test_struct.py


示例16: toBytes

    def toBytes(self, destBuf, newOffset=None, imgDataOffset=None, destOffset=0, order="="):
        if newOffset is None and self.isOffset:
            newOffset = self.val

        if self.isOffset:
            valFmt = 'L'
            val = newOffset
        elif self.isImageDataOffsetEntry() and imgDataOffset:
            # we are writing image data offsets within this entry; they are not themselves offset
            # for this to not be offset, there must be only one, since it's a long
            valFmt = 'L'
            val = imgDataOffset
        else:
            valFmt = lookupTagType(self.type).fmt * self.count
            val = self.val

        fmt = order+"HHI"+valFmt
        fmtSize = struct.calcsize(fmt)
        if fmtSize < 12:
            fmt += 'x'*(12-fmtSize)

        packing = [self.tag, self.type, self.count]
        if isinstance(val, tuple):
            packing += list(val)
        else:
            packing.append(val)

        struct.pack_into(fmt, destBuf, destOffset, *packing)
        return 12
开发者ID:EricSchles,项目名称:thunder,代码行数:29,代码来源:multitif.py


示例17: serialize

    def serialize(self, payload=None, prev=None):
        present = 0
        hdr = bytearray()
        optional = bytearray()

        if self.checksum is not None:
            present |= GRE_CHECKSUM_FLG

            # For purposes of computing the checksum,
            # the value of the checksum field is zero.
            # Also, because Reserved1 is always 0x00 of 2 bytes,
            # Set in conjunction with checksum.
            optional += b'\x00' * self._CHECKSUM_LEN

        if self._key is not None:
            present |= GRE_KEY_FLG
            optional += struct.pack(self._KEY_PACK_STR, self._key)

        if self.seq_number is not None:
            present |= GRE_SEQUENCE_NUM_FLG
            optional += struct.pack(self._SEQNUM_PACK_STR, self.seq_number)

        msg_pack_into(self._PACK_STR, hdr, 0, present, self.version,
                      self.protocol)

        hdr += optional

        if self.checksum:
            self.checksum = packet_utils.checksum(hdr)
            struct.pack_into(self._CHECKSUM_PACK_STR, hdr, self._MIN_LEN,
                             self.checksum)

        return hdr
开发者ID:5g-empower,项目名称:empower-ryu,代码行数:33,代码来源:gre.py


示例18: pack_into_format

def pack_into_format(frg,ack,type_field,rt,seq_number, u_id, d_id, data_length):
    #
    buff = ctypes.create_string_buffer(HEADER_L+data_length)
    #
    first_byte = 0b00000000
    # in case the rt or type_field are higher than normal
    if( len(bin(rt))-2 > RT_L):
        sys.exit('ERROR :  RT = '+format(rt)+' - * RT (Room Type) field is too big * -')
    first_byte += rt
    if(len(bin(type_field))-2 > TYPE_FIELD_L):
        sys.exit('ERROR :  Type = '+format(type_field)+' - * Type field is too big * -')
    first_byte += type_field << 2
    # frg and ack fields can be either True (1) or False (0)
    if frg:
        first_byte += 0b10000000
    if ack:
        first_byte += 0b01000000

    #
    if(seq_number>255 or u_id >255 or d_id > 255):
        sys.exit('Error :  Sequence Number, User Id or Destination Id field is too big'+
                 '')
    struct.pack_into(HEADER_FORMAT, buff, 0, first_byte, seq_number, u_id, d_id, data_length)
    # data_length is the length of the data in bytes
    # struct.pack_into(format(data_length)+'B',buff, HEADER_L, data)
    
    return buff
开发者ID:nidhog,项目名称:chat-while-watching,代码行数:27,代码来源:formatHandler.py


示例19: changeTone

 def changeTone(self, v_to, v_from='C'):
     folder_name = self.samples_folder+'/'+self.instrument.lower()
     to_filename = v_to.lower().replace('#', 's')+'.raw'
     from_filename = v_from.lower().replace('#', 's')+'.raw'
     f = open(folder_name+'/'+from_filename, 'rb')
     spam = f.read()
     f.close()
     i = 0
     ratio = float(self.notes[v_to])/self.notes[v_from]
     out = ctypes.create_string_buffer(int(float(len(spam))/ratio))
     while i<int(float(len(spam))/ratio):
         j = int(float(ratio)*i)
         j += i%4 - j%4
         if j+6<=len(spam):
             tmp1 = struct.unpack('h', spam[j:j+2])[0]
             tmp2 = struct.unpack('h', spam[j+4:j+6])[0]
             tmp = int(tmp1+(tmp2-tmp1)*(ratio-1))
             if tmp>256**2:
                 tmp = 256**2
             elif tmp<-256**2:
                 tmp = -256**2
             struct.pack_into('h', out, i, tmp)
         i = i+2
     f = open(folder_name+'/'+to_filename, 'wb')
     f.write(out)
     f.close()
开发者ID:asafonov,项目名称:sunrise,代码行数:26,代码来源:sampler.py


示例20: serialize

    def serialize(self):
        self.opt_para_len = 0
        # use a 2-octets AS number placeholder here, and the 
        # real(maybe 4 octets) AS number is in support_4_octets_as_num 
        # capability.
        # Note that the "23456" isn't just a random number, it's 
        # defined in RFC4893
        two_octet_as = 23456 if self.my_as > 65535 else self.my_as
        hdr = bytearray(struct.pack(bgp4_open._PACK_STR, self.version, 
                        two_octet_as, self.hold_time,
                        self.bgp_identifier.value, self.opt_para_len))

        if self.data != []:
            hdr += bytearray(struct.pack('!BB', self.type_, self.para_len))
            for para in self.data:

                #hdr += bytearray(struct.pack('!BB', para.code, para.length))
                cls_ = self._CAPABILITY_ADVERTISEMENT.get(para.code, None)
                if cls_:
                    hdr += para.serialize()
                    self.para_len += para._MIN_LEN

        self.opt_para_len = self.para_len + 2
        struct.pack_into('!B', hdr, 9, self.opt_para_len)
        struct.pack_into('!B', hdr, 11, self.para_len)
        return hdr
开发者ID:cannium,项目名称:openflow_routing_framework,代码行数:26,代码来源:BGP4.py



注:本文中的struct.pack_into函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python struct.struct_pack函数代码示例发布时间:2022-05-27
下一篇:
Python struct.pack函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap