本文整理汇总了Python中requests.utils.super_len函数的典型用法代码示例。如果您正苦于以下问题:Python super_len函数的具体用法?Python super_len怎么用?Python super_len使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了super_len函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _consume_current_data
def _consume_current_data(self, size):
written = 0
if self._current_data is None:
written = self._buffer.write(self.boundary)
written += self._buffer.write('\r\n')
elif (self._current_data is not None and
super_len(self._current_data) > 0):
written = self._buffer.write(self._current_data.read(size))
if super_len(self._current_data) == 0:
written += self._buffer.write('\r\n{0}\r\n'.format(self.boundary))
return written
开发者ID:zenweasel,项目名称:requests-toolbelt,代码行数:15,代码来源:multipart.py
示例2: _consume_current_data
def _consume_current_data(self, size):
"""Consume bytes from current field (_current_data, if any) to _buffer.
If field is finished after operation, add parts separator.
:returns number of bytes written to _buffer"""
written = 0
# File objects need an integer size
if size is None:
size = -1
if self._current_data is not None:
# and super_len(self._current_data) >= 0):
# Read from field, copy to _buffer:
written = self._buffer.write(self._current_data.read(size))
#print " written=",written," super_len(self._current_data) == ",super_len(self._current_data)
if super_len(self._current_data) == 0:
# we'we just finished current field: emit boundary now
# and reset it to None to forget
self._current_data = None
written += self._buffer.write(
encode_with('\r\n{0}'.format(self.boundary),
self.encoding)
)
# If this is the last separator add -- before \r\n:
#print "END of field: current_number=",self._current_field_number
if self._current_field_number == len(self.fields):
self._buffer.write(encode_with('--', self.encoding))
self._buffer.write(encode_with('\r\n', self.encoding))
return written
开发者ID:kery-chen,项目名称:pcs_api,代码行数:30,代码来源:utils.py
示例3: _calculate_length
def _calculate_length(self):
boundary_len = len(self.boundary) # Length of --{boundary}
self._len = 0
for (header, data) in self._fields_list:
# boundary length + header length + body length + len('\r\n') * 2
self._len += boundary_len + len(header) + super_len(data) + 4
# Length of trailing boundary '--{boundary}--\r\n'
self._len += boundary_len + 4
开发者ID:Connexions,项目名称:requests-toolbelt,代码行数:8,代码来源:multipart.py
示例4: _load_bytes
def _load_bytes(self, size):
self._buffer.smart_truncate()
amount_to_load = size - super_len(self._buffer)
bytes_to_append = True
while amount_to_load > 0 and bytes_to_append:
bytes_to_append = self._get_bytes()
amount_to_load -= self._buffer.append(bytes_to_append)
开发者ID:vanstoner,项目名称:xld-mule-mc-plugin,代码行数:8,代码来源:streaming_iterator.py
示例5: test_super_len_io_streams
def test_super_len_io_streams(self):
""" Ensures that we properly deal with different kinds of IO streams. """
# uses StringIO or io.StringIO (see import above)
from io import BytesIO
from requests.utils import super_len
assert super_len(StringIO.StringIO()) == 0
assert super_len(StringIO.StringIO("with so much drama in the LBC")) == 29
assert super_len(BytesIO()) == 0
assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
try:
import cStringIO
except ImportError:
pass
else:
assert super_len(cStringIO.StringIO("but some how, some way...")) == 25
开发者ID:RRedwards,项目名称:requests,代码行数:18,代码来源:test_requests.py
示例6: _calculate_length
def _calculate_length(self):
boundary_len = len(self.boundary) # Length of --{boundary}
self._len = 0
for (header, data) in self._fields_list:
# boundary length + header length + body length + len('\r\n') * 2
# note that in case header contains non-ascii chars, count must be based on encoded value:
self._len += boundary_len + len(encode_with(header, self.encoding)) + super_len(data) + 4
# Length of trailing boundary '--{boundary}--\r\n'
self._len += boundary_len + 4
开发者ID:kery-chen,项目名称:pcs_api,代码行数:9,代码来源:utils.py
示例7: smart_truncate
def smart_truncate(self):
to_be_read = super_len(self)
already_read = self._get_end() - to_be_read
if already_read >= to_be_read:
old_bytes = self.read()
self.seek(0, 0)
self.truncate()
self.write(old_bytes)
self.seek(0, 0) # We want to be at the beginning
开发者ID:eludom,项目名称:acd_cli,代码行数:10,代码来源:encoder.py
示例8: _calculate_length
def _calculate_length(self):
"""
This uses the parts to calculate the length of the body.
This returns the calculated length so __len__ can be lazy.
"""
boundary_len = len(self.boundary) # Length of --{boundary}
# boundary length + header length + body length + len('\r\n') * 2
self._len = sum((boundary_len + super_len(p) + 4) for p in self.parts) + boundary_len + 4
return self._len
开发者ID:eludom,项目名称:acd_cli,代码行数:10,代码来源:encoder.py
示例9: test_super_len_handles_files_raising_weird_errors_in_tell
def test_super_len_handles_files_raising_weird_errors_in_tell(self, error):
"""If tell() raises errors, assume the cursor is at position zero."""
class BoomFile(object):
def __len__(self):
return 5
def tell(self):
raise error()
assert super_len(BoomFile()) == 0
开发者ID:PoNote,项目名称:requests,代码行数:10,代码来源:test_utils.py
示例10: test_super_len_tell_ioerror
def test_super_len_tell_ioerror(self, error):
"""Ensure that if tell gives an IOError super_len doesn't fail"""
class NoLenBoomFile(object):
def tell(self):
raise error()
def seek(self, offset, whence):
pass
assert super_len(NoLenBoomFile()) == 0
开发者ID:PoNote,项目名称:requests,代码行数:10,代码来源:test_utils.py
示例11: _consume_current_data
def _consume_current_data(self, size):
written = 0
if self._current_data is None:
written = self._buffer.write(self.boundary.encode())
written += self._buffer.write('\r\n'.encode())
elif (self._current_data is not None and
super_len(self._current_data) > 0):
written = self._buffer.write(self._current_data.read(size))
if (self._current_data is not None and
super_len(self._current_data) - self._current_data.tell() == 0 and
not self.finished):
written += self._buffer.write(
'\r\n{0}\r\n'.format(self.boundary).encode()
)
return written
开发者ID:konomae,项目名称:requests-toolbelt,代码行数:19,代码来源:multipart.py
示例12: bytes_left_to_write
def bytes_left_to_write(self):
"""Determine if there are bytes left to write.
:returns: bool -- ``True`` if there are bytes left to write, otherwise
``False``
"""
to_read = 0
if self.headers_unread:
to_read += len(self.headers)
return (to_read + super_len(self.body)) > 0
开发者ID:eludom,项目名称:acd_cli,代码行数:11,代码来源:encoder.py
示例13: test_stringio
def test_stringio(self):
"""Test StringIO wrapper"""
parent_fd = io.BytesIO(b"0123456789" * 1024)
parent_fd.seek(0)
with SubsetIO(parent_fd, 1, 10) as chunked_fd:
self.assertEqual(super_len(chunked_fd), 10)
data = chunked_fd.read()
self.assertEqual(data, b"1234567890")
开发者ID:MediaFire,项目名称:mediafire-python-open-sdk,代码行数:11,代码来源:test_subsetio.py
示例14: _consume_current_data
def _consume_current_data(self, size):
written = 0
# File objects need an integer size
if size is None:
size = -1
if self._current_data is None:
written = self._buffer.write(self.boundary.encode('utf-8'))
written += self._buffer.write('\r\n'.encode('utf-8'))
elif (self._current_data is not None and
super_len(self._current_data) > 0):
written = self._buffer.write(self._current_data.read(size))
if super_len(self._current_data) == 0 and not self.finished:
written += self._buffer.write(
'\r\n{0}\r\n'.format(self.boundary).encode('utf-8')
)
return written
开发者ID:Connexions,项目名称:requests-toolbelt,代码行数:21,代码来源:multipart.py
示例15: _calculate_load_amount
def _calculate_load_amount(self, read_size):
"""This calculates how many bytes need to be added to the buffer.
When a consumer read's ``x`` from the buffer, there are two cases to
satisfy:
1. Enough data in the buffer to return the requested amount
2. Not enough data
This function uses the amount of unread bytes in the buffer and
determines how much the Encoder has to load before it can return the
requested amount of bytes.
:param int read_size: the number of bytes the consumer requests
:returns: int -- the number of bytes that must be loaded into the
buffer before the read can be satisfied. This will be strictly
non-negative
"""
amount = read_size - super_len(self._buffer)
return amount if amount > 0 else 0
开发者ID:eludom,项目名称:acd_cli,代码行数:20,代码来源:encoder.py
示例16: write_to
def write_to(self, buffer, size):
"""Write the requested amount of bytes to the buffer provided.
The number of bytes written may exceed size on the first read since we
load the headers ambitiously.
:param CustomBytesIO buffer: buffer we want to write bytes to
:param int size: number of bytes requested to be written to the buffer
:returns: int -- number of bytes actually written
"""
written = 0
if self.headers_unread:
written += buffer.append(self.headers)
self.headers_unread = False
while super_len(self.body) > 0 and (size == -1 or written < size):
amount_to_read = size
if size != -1:
amount_to_read = size - written
written += buffer.append(self.body.read(amount_to_read))
return written
开发者ID:eludom,项目名称:acd_cli,代码行数:22,代码来源:encoder.py
示例17: len
def len(self):
return super_len(self.fd) - self.fd.tell()
开发者ID:eludom,项目名称:acd_cli,代码行数:2,代码来源:encoder.py
示例18: test_super_len_with_no__len__
def test_super_len_with_no__len__(self):
class LenFile(object):
def __init__(self):
self.len = 5
assert super_len(LenFile()) == 5
开发者ID:PoNote,项目名称:requests,代码行数:6,代码来源:test_utils.py
示例19: test_super_len_with_tell
def test_super_len_with_tell(self):
foo = StringIO.StringIO('12345')
assert super_len(foo) == 5
foo.read(2)
assert super_len(foo) == 3
开发者ID:PoNote,项目名称:requests,代码行数:5,代码来源:test_utils.py
示例20: test_file
def test_file(self, tmpdir, mode, warnings_num, recwarn):
file_obj = tmpdir.join('test.txt')
file_obj.write('Test')
with file_obj.open(mode) as fd:
assert super_len(fd) == 4
assert len(recwarn) == warnings_num
开发者ID:PoNote,项目名称:requests,代码行数:6,代码来源:test_utils.py
注:本文中的requests.utils.super_len函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论