本文整理汇总了Python中xstruct.pack函数的典型用法代码示例。如果您正苦于以下问题:Python pack函数的具体用法?Python pack怎么用?Python pack使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了pack函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: pack_callback
def pack_callback(arg):
output = ""
for name, id, description, extra in arg:
extra = dict(extra)
output += pack('SIS', name, id, description)
if ObjectParamsStructDesc.has_key(id):
for substruct, name, description in ObjectParamsStructDesc[id]:
output += pack(substruct, extra.pop(name))
if len(extra) > 0:
raise TypeError("There was left over extra stuff...")
return output
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:12,代码来源:ObjectDesc.py
示例2: get_ack
def get_ack(self, force=False):
time_now = time.time()
if force or \
(self.last_receive_time < self.last_send_time and
time_now - self.last_send_time > self.ack_delay):
buf = base_container.WriteBuffer()
buf.append(struct.pack("<I", self.receive_process.next_sn-1))
for sn in self.receive_process.block_list:
buf.append(struct.pack("<I", sn))
return buf
return ""
开发者ID:XLILT,项目名称:XX-Net,代码行数:13,代码来源:proxy_session.py
示例3: __str__
def __str__(self):
output = Processed.__str__(self)
output += pack(self.struct, self.key, self.start, self.amount, self.since)
assert len(output) == Header.size+self.length, "Output length (%s) did not match out length! (%s)" % (len(output, self.length))
return output
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:7,代码来源:Base.py
示例4: pack
def pack(self, values):
"""\
pack() -> bytes
Returns a packed version of values.
"""
return xstruct.pack(self.xstruct, values)
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:7,代码来源:Structures.py
示例5: __str__
def __str__(self):
args = []
for name, type in self.names:
struct, size = ARG_STRUCTMAP[type]
attr = getattr(self, name)
if size == 1:
args.append(attr)
else:
args += list(attr)
output = Order.__str__(self)
try:
output += pack(self.substruct, *args)
return output
except TypeError, e:
s = str(e)
causedby = '%s %s' % self.names[int(s[:s.find(' ')])]
being = getattr(self, name)
traceback = sys.exc_info()[2]
while not traceback.tb_next is None:
traceback = traceback.tb_next
raise TypeError, '%s was %s\n%s' % (causedby, being, e), traceback
开发者ID:StupidIncarnate,项目名称:libtpproto-py,代码行数:26,代码来源:OrderDesc.py
示例6: __str__
def __str__(self):
output1 = pack(self.struct, self.id, self.modify_time, self.categories, self.name, self.desc, self.used, self.owner, self.components, self.feedback, self.properties)
self.length = len(output1)
output2 = Processed.__str__(self)
return output2+output1
开发者ID:StupidIncarnate,项目名称:libtpproto-py,代码行数:7,代码来源:Design.py
示例7: transfer_ack
def transfer_ack(self, position):
with self.recv_notice:
if self.transfered_close_to_peer:
return
cmd_position = struct.pack("<IBQ", self.next_recv_seq, 3, position)
self.session.send_conn_data(self.conn_id, cmd_position)
self.next_recv_seq += 1
开发者ID:Suwmlee,项目名称:XX-Net,代码行数:8,代码来源:base_container.py
示例8: transfer_peer_close
def transfer_peer_close(self, reason=""):
with self.recv_notice:
if self.transfered_close_to_peer:
return
self.transfered_close_to_peer = True
cmd = struct.pack("<IB", self.next_recv_seq, 2)
self.session.send_conn_data(self.conn_id, cmd + reason)
self.next_recv_seq += 1
开发者ID:Suwmlee,项目名称:XX-Net,代码行数:9,代码来源:base_container.py
示例9: serialize_body
def serialize_body(self):
data = struct.pack(
"!LL",
self.last_stream_id & 0x7FFFFFFF,
self.error_code
)
data += self.additional_data
return data
开发者ID:Suwmlee,项目名称:XX-Net,代码行数:9,代码来源:frame.py
示例10: create_conn
def create_conn(self, sock, host, port):
if not self.running:
xlog.debug("session not running, try to connect")
return None
self.lock.acquire()
self.last_conn_id += 1
conn_id = self.last_conn_id
self.lock.release()
seq = 0
cmd_type = 0 # create connection
sock_type = 0 # TCP
data = struct.pack("<IBBH", seq, cmd_type, sock_type, len(host)) + host + struct.pack("<H", port)
self.send_conn_data(conn_id, data)
self.conn_list[conn_id] = base_container.Conn(self, conn_id, sock, host, port, g.config.windows_size,
g.config.windows_ack, True, xlog)
return conn_id
开发者ID:XLILT,项目名称:XX-Net,代码行数:19,代码来源:proxy_session.py
示例11: __str__
def __str__(self):
output = Description.__str__(self)
output += pack(self.struct, \
self.id, \
self._name, \
self.description, \
self.arguments, \
self.modify_time)
return output
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:10,代码来源:OrderDesc.py
示例12: __str__
def __str__(self):
output = Processed.__str__(self)
output += pack(self.struct,
self.name, \
self.key,
self.tp, \
self.server, self.sertype, \
self.rule, self.rulever, \
self.locations, \
self._optional)
return output
开发者ID:StupidIncarnate,项目名称:libtpproto-py,代码行数:11,代码来源:Game.py
示例13: __str__
def __str__(self):
output = Describable.__str__(self)
output += pack(self.struct, \
self.id, \
self._subtype, \
self.name, \
self.desc, \
self.parent, \
self.contains, \
self.modify_time)
return output
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:11,代码来源:Object.py
示例14: send_conn_data
def send_conn_data(self, conn_id, data, no_delay=False):
if not self.running:
return
# xlog.debug("upload conn_id:%d, len:%d", conn_id, len(data))
buf = base_container.WriteBuffer()
buf.append(struct.pack("<II", conn_id, len(data)))
buf.append(data)
self.send_buffer.put(buf)
if self.send_buffer.pool_size > g.config.max_payload or \
time.time() - self.last_send_time > self.send_delay:
# xlog.debug("notify on send conn data")
self.wait_queue.notify()
开发者ID:XLILT,项目名称:XX-Net,代码行数:14,代码来源:proxy_session.py
示例15: pack_request
def pack_request(method, url, headers, body, timeout):
headers = dict(headers)
if isinstance(body, basestring) and body:
if len(body) < 10 * 1024 * 1024 and 'Content-Encoding' not in headers:
# 可以压缩
zbody = deflate(body)
if len(zbody) < len(body):
body = zbody
headers['Content-Encoding'] = 'deflate'
if len(body) > 10 * 1024 * 1024:
xlog.warn("body len:%d %s %s", len(body), method, url)
headers['Content-Length'] = str(len(body))
# GAE don't allow set `Host` header
if 'Host' in headers:
del headers['Host']
kwargs = {}
# gae 用的参数
if front.config.GAE_PASSWORD:
kwargs['password'] = front.config.GAE_PASSWORD
# kwargs['options'] =
kwargs['validate'] = front.config.GAE_VALIDATE
if url.endswith(".js"):
kwargs['maxsize'] = front.config.JS_MAXSIZE
else:
kwargs['maxsize'] = front.config.AUTORANGE_MAXSIZE
kwargs['timeout'] = str(timeout)
# gae 用的参数 end
payload = '%s %s HTTP/1.1\r\n' % (method, url)
payload += ''.join('%s: %s\r\n' % (k, v)
for k, v in headers.items() if k not in skip_headers)
# for k, v in headers.items():
# xlog.debug("Send %s: %s", k, v)
payload += ''.join('X-URLFETCH-%s: %s\r\n' % (k, v)
for k, v in kwargs.items() if v)
payload = deflate(payload)
# kkk
#print("len: ", len(payload))
body = '%s%s%s' % (struct.pack('!h', len(payload)), payload, body)
request_headers = {}
request_headers['Content-Length'] = str(len(body))
# request_headers 只有上面一项
return request_headers, body
开发者ID:XLILT,项目名称:XX-Net,代码行数:49,代码来源:gae_handler.py
示例16: __str__
def __str__(self):
output = Processed.__str__(self)
output += pack(
self.struct,
self.id,
self.name_singular,
self.name_plural,
self.unit_singular,
self.unit_plural,
self.description,
self.weight,
self.size,
self.modify_time,
)
return output
开发者ID:jmingtan,项目名称:tpclient-ogre,代码行数:16,代码来源:Resource.py
示例17: transfer_received_data
def transfer_received_data(self, data):
with self.recv_notice:
if self.transfered_close_to_peer:
return
buf = WriteBuffer(struct.pack("<IB", self.next_recv_seq, 1))
buf.append(data)
self.next_recv_seq += 1
self.received_position += len(data)
if self.received_position < 16 * 1024:
no_delay = True
else:
no_delay = False
self.session.send_conn_data(self.conn_id, buf, no_delay)
开发者ID:Suwmlee,项目名称:XX-Net,代码行数:17,代码来源:base_container.py
示例18: __str__
def __str__(self):
output = Describable.__str__(self)
output += pack(self.struct, \
self.id, \
self._subtype, \
self.name, \
self.size, \
self.pos[0], \
self.pos[1], \
self.pos[2], \
self.vel[0], \
self.vel[1], \
self.vel[2], \
self.contains, \
self.order_types, \
self.order_number, \
self.modify_time)
return output
开发者ID:StupidIncarnate,项目名称:libtpproto-py,代码行数:18,代码来源:Object.py
示例19: serialize
def serialize(self):
body = self.serialize_body()
self.body_len = len(body)
# Build the common frame header.
# First, get the flags.
flags = 0
for flag, flag_bit in self.defined_flags:
if flag in self.flags:
flags |= flag_bit
header = struct.pack(
"!HBBBL",
(self.body_len & 0xFFFF00) >> 8, # Length is spread over top 24 bits
self.body_len & 0x0000FF,
self.type,
flags,
self.stream_id & 0x7FFFFFFF # Stream ID is 32 bits.
)
return header + body
开发者ID:Suwmlee,项目名称:XX-Net,代码行数:22,代码来源:frame.py
示例20: __str__
def __str__(self):
output = Processed.__str__(self)
output += pack(self.struct, self.username, self.password, self.email, self.comment)
return output
开发者ID:StupidIncarnate,项目名称:libtpproto-py,代码行数:5,代码来源:Account.py
注:本文中的xstruct.pack函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论