本文整理汇总了Python中six.BytesIO类的典型用法代码示例。如果您正苦于以下问题:Python BytesIO类的具体用法?Python BytesIO怎么用?Python BytesIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BytesIO类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_dates
def test_dates(self):
w = Workbook()
ws = w.add_sheet("Hey, Dude")
fmts = [
"M/D/YY",
"D-MMM-YY",
"D-MMM",
"MMM-YY",
"h:mm AM/PM",
"h:mm:ss AM/PM",
"h:mm",
"h:mm:ss",
"M/D/YY h:mm",
"mm:ss",
"[h]:mm:ss",
"mm:ss.0",
]
i = 0
for fmt in fmts:
ws.write(i, 0, fmt)
style = XFStyle()
style.num_format_str = fmt
ws.write(i, 4, datetime(2013, 5, 10), style)
i += 1
stream = BytesIO()
w.save(stream)
md5 = hashlib.md5()
md5.update(stream.getvalue())
self.assertEqual("82fed69d4f9ea0444d159fa30080f0a3", md5.hexdigest())
开发者ID:hlawrenz,项目名称:xlwt,代码行数:35,代码来源:test_examples.py
示例2: visit_immutation
def visit_immutation(self, node, children):
context = self._final_context()
child_type = children[0].expr_name
if child_type == 'preview':
if self.tool == 'httpie':
command = ['http'] + context.httpie_args(self.method,
quote=True)
else:
assert self.tool == 'curl'
command = ['curl'] + context.curl_args(self.method, quote=True)
click.echo(' '.join(command))
elif child_type == 'action':
output = BytesIO()
try:
env = Environment(stdout=output, is_windows=False)
httpie_main(context.httpie_args(self.method), env=env)
content = output.getvalue()
finally:
output.close()
# XXX: Work around a bug of click.echo_via_pager(). When you pass
# a bytestring to echo_via_pager(), it converts the bytestring with
# str(b'abc'), which makes it "b'abc'".
if six.PY2:
content = unicode(content, 'utf-8') # noqa
else:
content = str(content, 'utf-8')
click.echo_via_pager(content)
return node
开发者ID:JethroTseng,项目名称:http-prompt,代码行数:31,代码来源:execution.py
示例3: OnFileViewHTML
def OnFileViewHTML(self, evt):
# Get an instance of the html file handler, use it to save the
# document to a StringIO stream, and then display the
# resulting html text in a dialog with a HtmlWindow.
handler = rt.RichTextHTMLHandler()
handler.SetFlags(rt.RICHTEXT_HANDLER_SAVE_IMAGES_TO_MEMORY)
handler.SetFontSizeMapping([7,9,11,12,14,22,100])
stream = BytesIO()
if not handler.SaveStream(self.rtc.GetBuffer(), stream):
return
import wx.html
dlg = wx.Dialog(self, title="HTML", style=wx.DEFAULT_DIALOG_STYLE|wx.RESIZE_BORDER)
html = wx.html.HtmlWindow(dlg, size=(500,400), style=wx.BORDER_SUNKEN)
html.SetPage(stream.getvalue())
btn = wx.Button(dlg, wx.ID_CANCEL)
sizer = wx.BoxSizer(wx.VERTICAL)
sizer.Add(html, 1, wx.ALL|wx.EXPAND, 5)
sizer.Add(btn, 0, wx.ALL|wx.CENTER, 10)
dlg.SetSizer(sizer)
sizer.Fit(dlg)
dlg.ShowModal()
handler.DeleteTemporaryImages()
开发者ID:GadgetSteve,项目名称:Phoenix,代码行数:26,代码来源:RichTextCtrl.py
示例4: generate_glassbrain_image
def generate_glassbrain_image(image_pk):
from neurovault.apps.statmaps.models import Image
import neurovault
import matplotlib as mpl
mpl.rcParams['savefig.format'] = 'jpg'
my_dpi = 50
fig = plt.figure(figsize=(330.0/my_dpi, 130.0/my_dpi), dpi=my_dpi)
img = Image.objects.get(pk=image_pk)
f = BytesIO()
try:
glass_brain = plot_glass_brain(img.file.path, figure=fig)
glass_brain.savefig(f, dpi=my_dpi)
except:
# Glass brains that do not produce will be given dummy image
this_path = os.path.abspath(os.path.dirname(__file__))
f = open(os.path.abspath(os.path.join(this_path,
"static","images","glass_brain_empty.jpg")))
raise
finally:
plt.close('all')
f.seek(0)
content_file = ContentFile(f.read())
img.thumbnail.save("glass_brain_%s.jpg" % img.pk, content_file)
img.save()
开发者ID:rwblair,项目名称:NeuroVault,代码行数:25,代码来源:tasks.py
示例5: make_options_body
def make_options_body(self):
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.1'],
'COMPRESSION': []
})
return options_buf.getvalue()
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:7,代码来源:test_libevreactor.py
示例6: filter
def filter(self, im):
if self.sharpness:
im = sharpen(im, self.sharpness)
buf = BytesIO()
if self.palette:
if im.mode in ('RGBA', 'LA'):
alpha = im.split()[3]
alpha = Image.eval(alpha, lambda a: 255 if a > 0 else 0)
mask = Image.eval(alpha, lambda a: 255 if a == 0 else 0)
matte = Image.new("RGBA", im.size, self.background)
matte.paste(im, (0, 0), im)
matte = matte.convert("RGB").convert(
'P', palette=Image.ADAPTIVE, colors=self.colors - 1)
matte.paste(self.colors, mask)
matte.save(buf, "PNG", transparency=self.colors)
elif im.mode not in ('P'):
im = im.convert('P', palette=Image.ADAPTIVE,
colors=self.colors)
im.save(buf, 'PNG')
else:
im.save(buf, 'PNG')
else:
if not im.mode.startswith("RGB"):
im = im.convert('RGB')
im.save(buf, 'PNG')
buf.seek(0)
return buf
开发者ID:cartlogic,项目名称:pyramid_frontend,代码行数:28,代码来源:filters.py
示例7: test_requested_compression_not_available
def test_requested_compression_not_available(self, *args):
c = self.make_connection()
c._callbacks = {0: c._handle_options_response}
c.defunct = Mock()
# request lz4 compression
c.compression = "lz4"
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy']
})
options = options_buf.getvalue()
message = self.make_msg(header, options)
c.process_msg(message, len(message) - 8)
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
开发者ID:jeffjirsa,项目名称:python-driver,代码行数:30,代码来源:test_connection.py
示例8: gzip
def gzip(f, *args, **kwargs):
"""GZip Flask Response Decorator."""
data = f(*args, **kwargs)
if isinstance(data, Response):
content = data.data
else:
content = data
gzip_buffer = BytesIO()
gzip_file = gzip2.GzipFile(
mode='wb',
compresslevel=4,
fileobj=gzip_buffer
)
gzip_file.write(content)
gzip_file.close()
gzip_data = gzip_buffer.getvalue()
if isinstance(data, Response):
data.data = gzip_data
data.headers['Content-Encoding'] = 'gzip'
data.headers['Content-Length'] = str(len(data.data))
return data
return gzip_data
开发者ID:Lucretiel,项目名称:httpbin,代码行数:29,代码来源:filters.py
示例9: serialize
def serialize(self, xid=None):
buff = BytesIO()
formats = []
data = []
if xid is not None:
formats.append(Int.fmt)
data.append(xid)
if self.opcode:
formats.append(Int.fmt)
data.append(self.opcode)
for request in self.requests:
header = MultiHeader(type=request.opcode, done=False, error=-1)
header_format, header_data = header.render()
formats.append(header_format)
data.extend(header_data)
payload_format, payload_data = request.render()
formats.append(payload_format)
data.extend(payload_data)
footer = MultiHeader(type=-1, done=True, error=-1)
footer_format, footer_data = footer.render()
formats.append(footer_format)
data.extend(footer_data)
buff.write(struct.pack("!" + "".join(formats), *data))
return buff.getvalue()
开发者ID:wglass,项目名称:zoonado,代码行数:30,代码来源:transaction.py
示例10: FakeResponse
class FakeResponse(object):
"""A fake HTTPResponse object for testing."""
def __init__(self, code, body, headers=None):
self.code = code
self.msg = str(code)
if headers is None:
headers = {}
self.headers = headers
self.info = lambda: self.headers
if isinstance(body, six.text_type):
body = body.encode('utf-8')
self.body_file = BytesIO(body)
def read(self):
"""Read the entire response body."""
return self.body_file.read()
def readline(self):
"""Read a single line from the response body."""
return self.body_file.readline()
def close(self):
"""Close the connection."""
pass
开发者ID:Shopify,项目名称:pyactiveresource,代码行数:25,代码来源:http_fake.py
示例11: default
def default(self, obj):
try:
return super(ObjectJSONEncoder, self).default(obj)
except TypeError as e:
if "not JSON serializable" not in str(e):
raise
if isinstance(obj, datetime.datetime):
return {'ISO8601_datetime': obj.strftime('%Y-%m-%dT%H:%M:%S.%f%z')}
if isinstance(obj, datetime.date):
return {'ISO8601_date': obj.isoformat()}
if numpy is not None and isinstance(obj, numpy.ndarray) and obj.ndim == 1:
memfile = BytesIO()
numpy.save(memfile, obj)
memfile.seek(0)
serialized = json.dumps(memfile.read().decode('latin-1'))
d = {
'__ndarray__': serialized,
}
return d
else:
d = {
'__class__': obj.__class__.__qualname__,
'__module__': obj.__module__,
}
return d
开发者ID:johnbywater,项目名称:eventsourcing,代码行数:25,代码来源:transcoding.py
示例12: deepCopy
def deepCopy(obj):
stream = BytesIO()
p = Pickler(stream, 1)
p.dump(obj)
stream.seek(0)
u = Unpickler(stream)
return u.load()
开发者ID:plone,项目名称:Products.CMFEditions,代码行数:7,代码来源:ZVCStorageTool.py
示例13: is_zipstream
def is_zipstream(data):
"""
just like zipfile.is_zipfile, but works upon buffers and streams
rather than filenames.
If data supports the read method, it will be treated as a stream
and read from to test whether it is a valid ZipFile.
If data also supports the tell and seek methods, it will be
rewound after being tested.
"""
if isinstance(data, (str, buffer)):
data = BytesIO(data)
if hasattr(data, "read"):
tell = 0
if hasattr(data, "tell"):
tell = data.tell()
try:
result = bool(_EndRecData(data))
except IOError:
result = False
if hasattr(data, "seek"):
data.seek(tell)
else:
raise TypeError("requies str, buffer, or stream-like object")
return result
开发者ID:obriencj,项目名称:python-javatools,代码行数:32,代码来源:ziputils.py
示例14: test_decode_response_gzip
def test_decode_response_gzip():
body = b'gzip message'
buf = BytesIO()
f = gzip.GzipFile('a', fileobj=buf, mode='wb')
f.write(body)
f.close()
compressed_body = buf.getvalue()
buf.close()
gzip_response = {
'body': {'string': compressed_body},
'headers': {
'access-control-allow-credentials': ['true'],
'access-control-allow-origin': ['*'],
'connection': ['keep-alive'],
'content-encoding': ['gzip'],
'content-length': ['177'],
'content-type': ['application/json'],
'date': ['Wed, 02 Dec 2015 19:44:32 GMT'],
'server': ['nginx']
},
'status': {'code': 200, 'message': 'OK'}
}
decoded_response = decode_response(gzip_response)
assert decoded_response['body']['string'] == body
assert decoded_response['headers']['content-length'] == [str(len(body))]
开发者ID:JanLikar,项目名称:vcrpy,代码行数:27,代码来源:test_filters.py
示例15: test_save_model_with_writable_caches
def test_save_model_with_writable_caches(self):
# If one or both cache elements are read-only, no saving.
expected_mean_vec = numpy.array([1, 2, 3])
expected_rotation = numpy.eye(3)
expected_mean_vec_bytes = BytesIO()
# noinspection PyTypeChecker
numpy.save(expected_mean_vec_bytes, expected_mean_vec)
expected_mean_vec_bytes = expected_mean_vec_bytes.getvalue()
expected_rotation_bytes = BytesIO()
# noinspection PyTypeChecker
numpy.save(expected_rotation_bytes, expected_rotation)
expected_rotation_bytes = expected_rotation_bytes.getvalue()
itq = ItqFunctor()
itq.mean_vec = expected_mean_vec
itq.rotation = expected_rotation
itq.mean_vec_cache_elem = DataMemoryElement(readonly=False)
itq.rotation_cache_elem = DataMemoryElement(readonly=False)
itq.save_model()
self.assertEqual(itq.mean_vec_cache_elem.get_bytes(),
expected_mean_vec_bytes)
self.assertEqual(itq.rotation_cache_elem.get_bytes(),
expected_rotation_bytes)
开发者ID:Kitware,项目名称:SMQTK,代码行数:26,代码来源:test_itq.py
示例16: recvbytes
def recvbytes(self, bytes_needed, sock_buf = None):
"""
Atomic read of bytes_needed bytes.
This function either returns exactly the nmber of bytes requested in a
StringIO buffer, None, or raises a socket error.
If the return value is None, it means the socket is closed by the other side.
"""
if sock_buf is None:
sock_buf = BytesIO()
bytes_count = 0
while bytes_count < bytes_needed:
chunk = self.recv(min(bytes_needed - bytes_count, 32768))
part_count = len(chunk)
if type(chunk) == str:
chunk = b(chunk)
if part_count < 1:
return None
bytes_count += part_count
sock_buf.write(chunk)
return sock_buf
开发者ID:ZJDong,项目名称:Wayfindr-Py,代码行数:26,代码来源:network.py
示例17: test_requested_compression_not_available
def test_requested_compression_not_available(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message)}
c.defunct = Mock()
# request lz4 compression
c.compression = "lz4"
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy']
})
options = options_buf.getvalue()
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
开发者ID:Adirio,项目名称:python-driver,代码行数:29,代码来源:test_connection.py
示例18: test_encode_decode_empty_string
def test_encode_decode_empty_string():
# This is a regression test for
# https://github.com/luispedro/jug/issues/39
s = BytesIO()
jug.backends.encode.encode_to('', s)
val = jug.backends.encode.decode_from(BytesIO(s.getvalue()))
assert val == ''
开发者ID:Javacym,项目名称:jug,代码行数:7,代码来源:test_file_store.py
示例19: test_use_requested_compression
def test_use_requested_compression(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message)}
c.defunct = Mock()
# request snappy compression
c.compression = "snappy"
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy', 'lz4']
})
options = options_buf.getvalue()
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
self.assertEqual(c.decompressor, locally_supported_compressions['snappy'][1])
开发者ID:Adirio,项目名称:python-driver,代码行数:26,代码来源:test_connection.py
示例20: test_disable_compression
def test_disable_compression(self, *args):
c = self.make_connection()
c._callbacks = {0: c._handle_options_response}
c.defunct = Mock()
# disable compression
c.compression = False
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy', 'lz4']
})
options = options_buf.getvalue()
message = self.make_msg(header, options)
c.process_msg(message, len(message) - 8)
self.assertEqual(c.decompressor, None)
开发者ID:jeffjirsa,项目名称:python-driver,代码行数:27,代码来源:test_connection.py
注:本文中的six.BytesIO类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论