本文整理汇总了Python中storages.compat.BytesIO类的典型用法代码示例。如果您正苦于以下问题:Python BytesIO类的具体用法?Python BytesIO怎么用?Python BytesIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BytesIO类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: SFTPStorageFile
class SFTPStorageFile(File):
def __init__(self, name, storage, mode):
self._name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
self.file = BytesIO()
self._is_read = False
@property
def size(self):
if not hasattr(self, '_size'):
self._size = self._storage.size(self._name)
return self._size
def read(self, num_bytes=None):
if not self._is_read:
self.file = self._storage._read(self._name)
self._is_read = True
return self.file.read(num_bytes)
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was opened for read-only access.")
self.file = BytesIO(content)
self._is_dirty = True
self._is_read = True
def close(self):
if self._is_dirty:
self._storage._save(self._name, self.file.getvalue())
self.file.close()
开发者ID:sam-at-github,项目名称:django-storages,代码行数:32,代码来源:sftpstorage.py
示例2: _compress_content
def _compress_content(self, content):
"""Gzip a given string content."""
zbuf = BytesIO()
zfile = GzipFile(mode="wb", compresslevel=6, fileobj=zbuf)
try:
zfile.write(force_bytes(content.read()))
finally:
zfile.close()
zbuf.seek(0)
content.file = zbuf
content.seek(0)
return content
开发者ID:thenewguy,项目名称:django-storages,代码行数:12,代码来源:s3boto.py
示例3: _read
def _read(self, name):
memory_file = BytesIO()
try:
pwd = self._connection.pwd()
self._connection.cwd(os.path.dirname(name))
self._connection.retrbinary('RETR ' + os.path.basename(name),
memory_file.write)
self._connection.cwd(pwd)
memory_file.seek(0)
return memory_file
except ftplib.all_errors:
raise FTPStorageException('Error reading file %s' % name)
开发者ID:BBKolton,项目名称:ml4,代码行数:12,代码来源:ftp.py
示例4: _compress_content
def _compress_content(self, content):
"""Gzip a given string content."""
zbuf = BytesIO()
zfile = GzipFile(mode='wb', compresslevel=6, fileobj=zbuf)
try:
zfile.write(force_bytes(content.read()))
finally:
zfile.close()
zbuf.seek(0)
# Boto 2 returned the InMemoryUploadedFile with the file pointer replaced,
# but Boto 3 seems to have issues with that. No need for fp.name in Boto3
# so just returning the BytesIO directly
return zbuf
开发者ID:BBKolton,项目名称:ml4,代码行数:13,代码来源:s3boto3.py
示例5: _compress_content
def _compress_content(self, content):
"""Gzip a given string content."""
zbuf = BytesIO()
# The GZIP header has a modification time attribute (see http://www.zlib.org/rfc-gzip.html)
# This means each time a file is compressed it changes even if the other contents don't change
# For S3 this defeats detection of changes using MD5 sums on gzipped files
# Fixing the mtime at 0.0 at compression time avoids this problem
zfile = GzipFile(mode="wb", compresslevel=6, fileobj=zbuf, mtime=0.0)
try:
zfile.write(force_bytes(content.read()))
finally:
zfile.close()
zbuf.seek(0)
content.file = zbuf
content.seek(0)
return content
开发者ID:tellybug,项目名称:django-storages-1,代码行数:16,代码来源:s3boto.py
示例6: __init__
def __init__(self, name, storage, mode):
self._name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
self.file = BytesIO()
self.start_range = 0
开发者ID:meshy,项目名称:django-storages-redux,代码行数:7,代码来源:apache_libcloud.py
示例7: __init__
def __init__(self, name, storage, mode):
self.name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
self.file = BytesIO()
self._is_read = False
开发者ID:BBKolton,项目名称:ml4,代码行数:7,代码来源:ftp.py
示例8: read
def read(self, num_bytes=None):
if num_bytes is None:
args = []
self.start_range = 0
else:
args = [self.start_range, self.start_range + num_bytes - 1]
data = self._storage._read(self._name, *args)
self.file = BytesIO(data)
return self.file.getvalue()
开发者ID:meshy,项目名称:django-storages-redux,代码行数:9,代码来源:apache_libcloud.py
示例9: _open
def _open(self, name, mode='rb'):
"""Open a file from database.
@param name filename or relative path to file based on base_url. path should contain only "/", but not "\". Apache sends pathes with "/".
If there is no such file in the db, returs None
"""
assert mode == 'rb', "You've tried to open binary file without specifying binary mode! You specified: %s"%mode
row = self.cursor.execute("SELECT %s from %s where %s = '%s'"%(self.blob_column,self.db_table,self.fname_column,name) ).fetchone()
if row is None:
return None
inMemFile = BytesIO(row[0])
inMemFile.name = name
inMemFile.mode = mode
retFile = File(inMemFile)
return retFile
开发者ID:42reports,项目名称:django-storages,代码行数:18,代码来源:database.py
示例10: CouchDBFile
class CouchDBFile(File):
"""
CouchDBFile - a Django File-like class for CouchDB documents.
"""
def __init__(self, name, storage, mode):
self._name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
try:
self._doc = self._storage.get_document(name)
tmp, ext = os.path.split(name)
if ext:
filename = "content." + ext
else:
filename = "content"
attachment = self._storage.db.get_attachment(self._doc, filename=filename)
self.file = BytesIO(attachment)
except couchdb.client.ResourceNotFound:
if 'r' in self._mode:
raise ValueError("The file cannot be reopened.")
else:
self.file = BytesIO()
self._is_dirty = True
@property
def size(self):
return self._doc['size']
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was opened for read-only access.")
self.file = BytesIO(content)
self._is_dirty = True
def close(self):
if self._is_dirty:
self._storage._put_file(self._name, self.file.getvalue())
self.file.close()
开发者ID:Artivest,项目名称:django-storages,代码行数:42,代码来源:couchdb.py
示例11: __init__
def __init__(self, name, storage, mode):
self._name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
try:
self._doc = self._storage.get_document(name)
tmp, ext = os.path.split(name)
if ext:
filename = "content." + ext
else:
filename = "content"
attachment = self._storage.db.get_attachment(self._doc, filename=filename)
self.file = BytesIO(attachment)
except couchdb.client.ResourceNotFound:
if 'r' in self._mode:
raise ValueError("The file cannot be reopened.")
else:
self.file = BytesIO()
self._is_dirty = True
开发者ID:Artivest,项目名称:django-storages,代码行数:22,代码来源:couchdb.py
示例12: LibCloudFile
class LibCloudFile(File):
"""File inherited class for libcloud storage objects read and write"""
def __init__(self, name, storage, mode):
self.name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
self._file = None
def _get_file(self):
if self._file is None:
data = self._storage._read(self.name)
self._file = BytesIO(data)
return self._file
def _set_file(self, value):
self._file = value
file = property(_get_file, _set_file)
@property
def size(self):
if not hasattr(self, '_size'):
self._size = self._storage.size(self.name)
return self._size
def read(self, num_bytes=None):
return self.file.read(num_bytes)
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was opened for read-only access.")
self.file = BytesIO(content)
self._is_dirty = True
def close(self):
if self._is_dirty:
self._storage._save(self.name, self.file)
self.file.close()
开发者ID:Artivest,项目名称:django-storages,代码行数:39,代码来源:apache_libcloud.py
示例13: LibCloudFile
class LibCloudFile(File):
"""File inherited class for libcloud storage objects read and write"""
def __init__(self, name, storage, mode):
self._name = name
self._storage = storage
self._mode = mode
self._is_dirty = False
self.file = BytesIO()
self.start_range = 0
@property
def size(self):
if not hasattr(self, "_size"):
self._size = self._storage.size(self._name)
return self._size
def read(self, num_bytes=None):
if num_bytes is None:
args = []
self.start_range = 0
else:
args = [self.start_range, self.start_range + num_bytes - 1]
data = self._storage._read(self._name, *args)
self.file = BytesIO(data)
return self.file.getvalue()
def write(self, content):
if "w" not in self._mode:
raise AttributeError("File was opened for read-only access.")
self.file = BytesIO(content)
self._is_dirty = True
def close(self):
if self._is_dirty:
self._storage._save(self._name, self.file)
self.file.close()
开发者ID:marcelchastain,项目名称:django-storages,代码行数:37,代码来源:apache_libcloud.py
示例14: write
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was opened for read-only access.")
self.file = BytesIO(content)
self._is_dirty = True
开发者ID:Artivest,项目名称:django-storages,代码行数:5,代码来源:couchdb.py
注:本文中的storages.compat.BytesIO类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论