本文整理汇总了Python中qiniu.BucketManager类的典型用法代码示例。如果您正苦于以下问题:Python BucketManager类的具体用法?Python BucketManager怎么用?Python BucketManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BucketManager类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: QiniuUpload
class QiniuUpload():
def __init__(self, bucket = config.PIC_BUCKET, domain = config.PIC_DOMAIN):
self.qiniuer = qiniu.Auth(config.QINIU_ACCESS_KEY, config.QINIU_SECRET_KEY)
self.bucket_manager = BucketManager(self.qiniuer)
self.bucket = bucket
self.domain = domain
def get_token(self):
return self.qiniuer.upload_token(self.bucket)
def del_file(self, key):
ret, info = self.bucket_manager.delete(self.bucket, key)
# 错误处理
assert ret is None
assert info.status_code == 612
return ret, info
def get_upload_info(self, key):
ret, info = self.bucket_manager.stat(self.bucket, key)
assert 'hash' in ret
return info
def get_file_info(self, key):
url = self.domain + key + '?imageInfo'
try:
response = urllib2.urlopen(url, timeout=5)
resp = response.read()
return resp
except Exception, e:
print e
return None
开发者ID:awesome-archive,项目名称:PicBucket,代码行数:31,代码来源:upload.py
示例2: _list_remote_bucket
def _list_remote_bucket(self):
"""
traverse the buckt through the Qiniu API and return a list of keys
and timestamp
:except no connection is established
:return: a dict mapping key to upload time
"""
key_list = {}
big_file = {}
done = False
marker = None
bucket = BucketManager(self.auth)
while not done:
res, done, _ = bucket.list(self.bucketname, marker=marker,
limit=self.BATCH_LIMIT)
if not res:
self.logger('ERROR',
'could not establish connection with cloud. Exit.')
sys.exit(1)
marker = res.get('marker')
for resource in res['items']:
key_list[resource['key']] = resource['putTime']
big_file[resource['key']] = resource['fsize'] \
if resource['fsize'] > self.download_size_threshold * 2 \
else 0
return key_list, big_file
开发者ID:nykh,项目名称:qiniu-backup-tool,代码行数:28,代码来源:qbackup.py
示例3: list
def list(self, bucket_name, prefix, limit=5):
bucket = BucketManager(self.mauth)
delimiter = None
marker = None
ret, eof, info = bucket.list(bucket_name, prefix, marker, limit, delimiter)
for item in ret.get('items'):
print "{Name: %s Size:%s Hash:%s}" % (item['key'], item['fsize'], item['hash'])
开发者ID:Arvon2014,项目名称:arvon-scripts,代码行数:7,代码来源:7niu_upload_mange.py
示例4: teardown_class
def teardown_class(cls):
"""Delete all files in the test bucket.
"""
storage = QiniuPrivateStorage(
bucket_name=get_qiniu_config('QINIU_PRIVATE_BUCKET_NAME'),
bucket_domain=get_qiniu_config('QINIU_PRIVATE_BUCKET_DOMAIN'),
)
auth = storage.auth
bucket = BucketManager(auth)
while True:
ret, eof, info = bucket.list(storage.bucket_name, limit=100)
if ret is None:
print(info)
break
for item in ret['items']:
name = item['key']
if six.PY2:
name = name.encode('utf-8')
ret, info = bucket.delete(storage.bucket_name, name)
if ret is None:
print(info)
if eof:
break
开发者ID:microdog,项目名称:django-qiniu-storage,代码行数:26,代码来源:test_private_storage.py
示例5: QiniuWrapper
class QiniuWrapper():
policy = {'returnBody': '{"key": $(key), "type": $(mimeType), "name": $(fname), "size": $(fsize), "hash": $(etag)}'}
bucket_name = QINIU_BUCKET_NAME
domain = 'http://%s.qiniudn.com/' % bucket_name
def __init__(self):
self.q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
self.bucket_manager = BucketManager(self.q)
def get_upload_token(self, key, expires=3600):
return self.q.upload_token(self.bucket_name, key, expires, self.policy)
def upload_file(self, key, filename, mime_type="application/octet-stream"):
'''
上传文件到七牛,如果指定的key对应的文件在七牛上已经存在, 会覆盖原来七牛上的文件
'''
ret, info = put_file(self.get_upload_token(key), key, filename, mime_type=mime_type, check_crc=True)
if info.status_code != 200:
return (False, info)
return (True, info)
def upload_stream(self, key, input_stream, data_size, mime_type="application/octet-stream"):
'''
上传文件到七牛,如果指定的key对应的文件在七牛上已经存在, 会覆盖原来七牛上的文件
'''
ret, info = put_stream(self.get_upload_token(key), key, input_stream, data_size, mime_type=mime_type, check_crc=True)
if info.status_code != 200:
return (False, info)
return (True, info)
def move(self, old_key, new_key):
ret, info = self.bucket_manager.move(self.bucket_name, old_key, self.bucket_name, new_key)
if info.status_code != 200:
return (False, info)
return (True, info)
def delete(self, key):
ret, info = self.bucket_manager.delete(self.bucket_name, key)
if info.status_code != 200:
return (False, info)
return (True, info)
def batch_delete(self, keys):
'''
keys = ['key1', 'key2', 'key3']
'''
ops = build_batch_delete(self.bucket_name, keys)
ret, info = self.bucket_manager.batch(ops)
if info.status_code != 200:
return (False, info)
return (True, info)
def list(self, prefix=None, limit=1000, marker=None):
return self.bucket_manager.list(self.bucket_name, prefix=prefix, marker=marker, limit=limit)
@classmethod
def get_url(cls, key):
return cls.domain + key
开发者ID:735254599,项目名称:Learning-Note,代码行数:59,代码来源:qiniuwrapper.py
示例6: removeFile
def removeFile(key):
q = Auth(access_key, secret_key)
bucket_name = defaultBucketName
bucket = BucketManager(q)
ret, info = bucket.delete(bucket_name, key)
logging.debug('remove qiniu file uri:[%r]' % (key))
return key
开发者ID:LeoLy008,项目名称:Hello-World,代码行数:8,代码来源:uploadQNTools.py
示例7: bucket_list
def bucket_list():
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
ret, eof, info = bucket.list(bucket_name, limit=4)
for item in ret.get('items', []):
print item
ret, eof, info = bucket.list(bucket_name, limit=100)
print(info)
开发者ID:besky86,项目名称:source,代码行数:8,代码来源:test_qiniu2.py
示例8: move_file
def move_file(username,path,filename,newpath,newfilename,bucket=None): # bucket即bucketmanager path='/dir/dir' 可用于重命名文件,移动文件,移动目录,重命名目录 # ok
user = User.objects.filter(username=username)
if bool(user):
user = user[0]
else:
return False
if bucket == None:
q = Auth(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY) # 授权
bucket = BucketManager(q)
if not check_str(filename) or not check_str(path,ispath=True) or not check_str(newfilename) or not check_str(newpath,ispath=True):
return False
file1 = FileInfo.objects.filter(owner=user,file_name=filename,file_path=path)
filecheck = FileInfo.objects.filter(owner=user,file_path=newpath) # filepath 可以不存在
if bool(file1) and bool(filecheck):
update_dir_size(username,path,0-file1[0].size)
update_dir_size(username,newpath,file1[0].size)
else:
return False
# 数据库操作
file1 = FileInfo.objects.filter(owner=user,file_name=filename,file_path=path)
if bool(file1):
file1 = file1[0]
else:
return False # 文件名或者文件夹名不存在
filecheck = FileInfo.objects.filter(owner=user,file_name=newfilename,file_path=newpath)
if bool(filecheck):
return False
file1.file_path = newpath
file1.file_name = newfilename
file1.save()
if path == '/':
path = ''
if newpath == '/':
newpath = ''
if file1.file_type == 'dir':
subpath = ''.join([path,'/',filename])
files = FileInfo.objects.filter(owner=user,file_path=subpath)
subpath2 = ''.join([newpath,'/',newfilename])
for f in files:
if f.file_type != 'dir':
f.file_path = subpath2
f.save()
key = ''.join([username, subpath , '/', f.file_name])
key2 = ''.join([username, subpath2, '/', f.file_name])
ret, info = bucket.move(settings.QINIU_BUCKET_NAME, key, settings.QINIU_BUCKET_NAME, key2)
print info
# assert ret=={}
else :
move_file(username,subpath,f.file_name,subpath2,f.file_name,bucket)
else:
key = ''.join([username, path, '/', filename])
key2 = ''.join([username, newpath, '/', newfilename])
ret, info = bucket.move(settings.QINIU_BUCKET_NAME, key, settings.QINIU_BUCKET_NAME, key2)
print info
# assert ret == {}
return True
开发者ID:wumengqiang,项目名称:limidrive,代码行数:57,代码来源:common.py
示例9: get
def get(self, request):
mediaid = request.query_params["mediaid"]
filename = str(uuid.uuid1()).replace("-", "") + ".jpg"
q = Auth(settings.QNACCESSKEY, settings.QNSECRETKEY)
bucket = BucketManager(q)
token = wxutils.get_accesstoken()
mediaurl = "http://file.api.weixin.qq.com/cgi-bin/media/get?access_token=" + token + "&media_id=" + mediaid
ret, info = bucket.fetch(mediaurl, settings.QNBUKET, filename)
return Response({"pic": filename})
开发者ID:roynwang,项目名称:o2gymsvr,代码行数:9,代码来源:views.py
示例10: delete_files
def delete_files(self,source_bucket,pathlist=[]):
"""Args:
source_bucket: 'source_bucket'
pathlist: ['source_file_name',...]
"""
bucket = BucketManager(self.__auth)
ops = build_batch_delete(source_bucket, pathlist)
ret, info = bucket.batch(ops)
return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:9,代码来源:easy_qiniu.py
示例11: get_file_info
def get_file_info(self,bucket_name,keys=[]):
"""Args:
bucket_name:'bucket_name'
keys: ['fileName1','fileName2']
"""
bucket = BucketManager(self.__auth)
ops = build_batch_stat(bucket_name, keys)
ret, info = bucket.batch(ops)
return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:9,代码来源:easy_qiniu.py
示例12: qiniu_file
def qiniu_file(pre_text, limit):
q = qiniu_q()
bucket = BucketManager(q)
bucket_name = 'love-travel'
ret, eof, info = bucket.list(bucket_name, prefix=pre_text, delimiter="/", limit=limit)
if len(ret["items"]) > 0 :
return {'result' : True, 'files' : ret["items"]}
else:
return {'result' : False}
开发者ID:liangxuCHEN,项目名称:CRM_systeme,代码行数:9,代码来源:tool.py
示例13: save_to_qiniu
def save_to_qiniu(url, store_name):
"""放到七牛云存储上
"""
bucket_name = config.QINIU['bucket']
g = Auth(config.QINIU['accesskey'], config.QINIU['secretkey'])
bucket = BucketManager(g)
ret, info = bucket.fetch(url, bucket_name, store_name)
if ret:
return False
return config.QINIU['cdn'] + '/' + store_name
开发者ID:robertding,项目名称:fbooks,代码行数:10,代码来源:storage.py
示例14: move_files
def move_files(self,source_bucket,target_bucket,pathdict={}):
"""Args:
source_bucket: 'source_bucket'
target_bucket: 'target_bucket'
pathdict: {'source_file_name':'target_file_name',...}
"""
bucket = BucketManager(self.__auth)
ops = build_batch_move(source_bucket, pathdict, target_bucket)
ret, info = bucket.batch(ops)
return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:10,代码来源:easy_qiniu.py
示例15: isFileExist
def isFileExist(file_name):
q = getAuth()
# check if file already exist
bucket_name = os.environ['server']
bucket = BucketManager(getAuth())
ret, info = bucket.stat(bucket_name, file_name)
if ret != None:
return True
else:
return False
开发者ID:aptonic,项目名称:dropzone3-actions,代码行数:10,代码来源:action.py
示例16: bucket_list
def bucket_list():
bm = BucketManager(qa())
ret = bm.list('image', None, None, 10, None)
if ret and ret[0] and ret[0].get('items') and len(ret[0].get('items')) > 0:
imgs = ret[0].get('items')
marker = ret[0].get('marker')
print(marker)
if imgs:
for x in imgs:
print('ll.append("%s")' % x['key'])
print(ret)
开发者ID:tianshan20081,项目名称:dev,代码行数:11,代码来源:qn.py
示例17: upload_by_fetch
def upload_by_fetch(self, remote_file_path, key):
"""
通过远程url上传文件,但是可能报错,请使用upload()方法
:param remote_file_path:
:param key:
:return:
"""
bucket = BucketManager(self.auth)
ret, info = bucket.fetch(remote_file_path, QINIU_DEFAULT_BUCKET, key)
print('ret: %s' % ret)
print('info: %s' % info)
开发者ID:allhu,项目名称:scrapy_in_practice,代码行数:11,代码来源:qiniu_cloud.py
示例18: save_image
def save_image(url, bookname):
"""抓取图片放到七牛云存储上
"""
bucket_name = config.QINIU['bucket']
g = Auth(config.QINIU['accesskey'], config.QINIU['secretkey'])
bucket = BucketManager(g)
save_to = 'IMAGE|{}'.format(bookname)
ret, info = bucket.fetch(url, bucket_name, save_to)
if ret:
return False
return config.QINIU['cdn'] + '/' + save_to
开发者ID:robertding,项目名称:fbooks,代码行数:11,代码来源:storage.py
示例19: multi_delete
def multi_delete(self, key_list):
# key_list: [key1, key2, key3, ...]
from qiniu import build_batch_delete
bucket = BucketManager(self.q)
ops = build_batch_delete(self.bucket_name, key_list)
ret, info = bucket.batch(ops)
# print('QiniuStorage - multi_delete: %s' % info)
json_info = json.loads(info.text_body)
for m_info in json_info:
# "code":612,"data":{"error":"no such file or directory"
assert m_info[u'code'] == 200 or m_info[u'code'] == 612
开发者ID:carlo-z,项目名称:carloz-lib-web,代码行数:12,代码来源:qiniu_cloud.py
示例20: fetch_files_from_net_to_qiniu
def fetch_files_from_net_to_qiniu(self,bucket_name,pathdict={}):
"""Args:
bucket_name: 'bucket_name'
pathdict: {'source_file_name':'target_file_name',...}
"""
bucket = BucketManager(self.__auth)
rets=[]
infos=[]
for p in pathdict.keys():
ret, info = bucket.fetch(pathdict[p], bucket_name,p)
rets.append(ret)
infos.append(info)
return rets,infos
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:13,代码来源:easy_qiniu.py
注:本文中的qiniu.BucketManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论