• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python qiniu.BucketManager类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中qiniu.BucketManager的典型用法代码示例。如果您正苦于以下问题:Python BucketManager类的具体用法?Python BucketManager怎么用?Python BucketManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BucketManager类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: QiniuUpload

class QiniuUpload():
    def __init__(self, bucket = config.PIC_BUCKET, domain = config.PIC_DOMAIN):
        self.qiniuer = qiniu.Auth(config.QINIU_ACCESS_KEY, config.QINIU_SECRET_KEY)
        self.bucket_manager = BucketManager(self.qiniuer)
        self.bucket = bucket
        self.domain = domain

    def get_token(self):
        return self.qiniuer.upload_token(self.bucket)

    def del_file(self, key):
        ret, info = self.bucket_manager.delete(self.bucket, key)
        # 错误处理
        assert ret is None
        assert info.status_code == 612
        return ret, info

    def get_upload_info(self, key):
        ret, info = self.bucket_manager.stat(self.bucket, key)
        assert 'hash' in ret
        return info

    def get_file_info(self, key):
        url = self.domain + key + '?imageInfo'
        try:
            response = urllib2.urlopen(url, timeout=5)
            resp = response.read()
            return resp
        except Exception, e:
            print e
            return None
开发者ID:awesome-archive,项目名称:PicBucket,代码行数:31,代码来源:upload.py


示例2: _list_remote_bucket

    def _list_remote_bucket(self):
        """
        traverse the buckt through the Qiniu API and return a list of keys
        and timestamp
        :except no connection is established
        :return: a dict mapping key to upload time
        """
        key_list = {}
        big_file = {}
        done = False
        marker = None

        bucket = BucketManager(self.auth)

        while not done:
            res, done, _ = bucket.list(self.bucketname, marker=marker,
                                       limit=self.BATCH_LIMIT)
            if not res:
                self.logger('ERROR',
                            'could not establish connection with cloud. Exit.')
                sys.exit(1)
            marker = res.get('marker')
            for resource in res['items']:
                key_list[resource['key']] = resource['putTime']
                big_file[resource['key']] = resource['fsize'] \
                    if resource['fsize'] > self.download_size_threshold * 2 \
                    else 0
        return key_list, big_file
开发者ID:nykh,项目名称:qiniu-backup-tool,代码行数:28,代码来源:qbackup.py


示例3: list

 def list(self, bucket_name, prefix, limit=5):
     bucket = BucketManager(self.mauth)
     delimiter = None
     marker = None
     ret, eof, info = bucket.list(bucket_name, prefix, marker, limit, delimiter)
     for item in ret.get('items'):
         print "{Name: %s Size:%s Hash:%s}" % (item['key'], item['fsize'], item['hash'])
开发者ID:Arvon2014,项目名称:arvon-scripts,代码行数:7,代码来源:7niu_upload_mange.py


示例4: teardown_class

    def teardown_class(cls):
        """Delete all files in the test bucket.
        """
        storage = QiniuPrivateStorage(
            bucket_name=get_qiniu_config('QINIU_PRIVATE_BUCKET_NAME'),
            bucket_domain=get_qiniu_config('QINIU_PRIVATE_BUCKET_DOMAIN'),
        )
        auth = storage.auth
        bucket = BucketManager(auth)

        while True:
            ret, eof, info = bucket.list(storage.bucket_name, limit=100)

            if ret is None:
                print(info)
                break

            for item in ret['items']:
                name = item['key']
                if six.PY2:
                    name = name.encode('utf-8')
                ret, info = bucket.delete(storage.bucket_name, name)
                if ret is None:
                    print(info)
            if eof:
                break
开发者ID:microdog,项目名称:django-qiniu-storage,代码行数:26,代码来源:test_private_storage.py


示例5: QiniuWrapper

class QiniuWrapper():

    policy = {'returnBody': '{"key": $(key), "type": $(mimeType), "name": $(fname), "size": $(fsize), "hash": $(etag)}'}
    bucket_name = QINIU_BUCKET_NAME
    domain = 'http://%s.qiniudn.com/' % bucket_name

    def __init__(self):
        self.q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
        self.bucket_manager = BucketManager(self.q)

    def get_upload_token(self, key, expires=3600):
        return self.q.upload_token(self.bucket_name, key, expires, self.policy)

    def upload_file(self, key, filename, mime_type="application/octet-stream"):
        '''
        上传文件到七牛,如果指定的key对应的文件在七牛上已经存在, 会覆盖原来七牛上的文件
        '''
        ret, info = put_file(self.get_upload_token(key), key, filename, mime_type=mime_type, check_crc=True)
        if info.status_code != 200:
            return (False, info)
        return (True, info)

    def upload_stream(self, key, input_stream, data_size, mime_type="application/octet-stream"):
        '''
        上传文件到七牛,如果指定的key对应的文件在七牛上已经存在, 会覆盖原来七牛上的文件
        '''
        ret, info = put_stream(self.get_upload_token(key), key, input_stream, data_size, mime_type=mime_type, check_crc=True)
        if info.status_code != 200:
            return (False, info)
        return (True, info)

    def move(self, old_key, new_key):
        ret, info = self.bucket_manager.move(self.bucket_name, old_key, self.bucket_name, new_key)
        if info.status_code != 200:
            return (False, info)
        return (True, info)

    def delete(self, key):
        ret, info = self.bucket_manager.delete(self.bucket_name, key)
        if info.status_code != 200:
            return (False, info)
        return (True, info)

    def batch_delete(self, keys):
        '''
        keys = ['key1', 'key2', 'key3']
        '''
        ops = build_batch_delete(self.bucket_name, keys)
        ret, info = self.bucket_manager.batch(ops)
        if info.status_code != 200:
            return (False, info)
        return (True, info)

    def list(self, prefix=None, limit=1000, marker=None):
        return self.bucket_manager.list(self.bucket_name, prefix=prefix, marker=marker, limit=limit)

    @classmethod
    def get_url(cls, key):
        return cls.domain + key
开发者ID:735254599,项目名称:Learning-Note,代码行数:59,代码来源:qiniuwrapper.py


示例6: removeFile

def removeFile(key):
    q = Auth(access_key, secret_key)
    bucket_name = defaultBucketName
    bucket = BucketManager(q)

    ret, info = bucket.delete(bucket_name, key)
    logging.debug('remove qiniu file uri:[%r]' % (key))
    return key
开发者ID:LeoLy008,项目名称:Hello-World,代码行数:8,代码来源:uploadQNTools.py


示例7: bucket_list

def bucket_list():
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)
    ret, eof, info = bucket.list(bucket_name, limit=4)
    for item in ret.get('items', []):
        print item
    ret, eof, info = bucket.list(bucket_name, limit=100)
    print(info)
开发者ID:besky86,项目名称:source,代码行数:8,代码来源:test_qiniu2.py


示例8: move_file

def move_file(username,path,filename,newpath,newfilename,bucket=None):  # bucket即bucketmanager path='/dir/dir' 可用于重命名文件,移动文件,移动目录,重命名目录 # ok
    user = User.objects.filter(username=username)
    if bool(user):
        user = user[0]
    else:
        return False
    if bucket == None:
        q = Auth(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY)  # 授权
        bucket = BucketManager(q)
        if  not check_str(filename) or not check_str(path,ispath=True) or  not check_str(newfilename) or not check_str(newpath,ispath=True):
            return False

        file1 = FileInfo.objects.filter(owner=user,file_name=filename,file_path=path)
        filecheck = FileInfo.objects.filter(owner=user,file_path=newpath)  # filepath 可以不存在
        if bool(file1) and bool(filecheck):
            update_dir_size(username,path,0-file1[0].size)
            update_dir_size(username,newpath,file1[0].size)
        else:
            return False
    # 数据库操作
    file1 = FileInfo.objects.filter(owner=user,file_name=filename,file_path=path)
    if bool(file1):
        file1 = file1[0]
    else:
        return False  # 文件名或者文件夹名不存在
    filecheck = FileInfo.objects.filter(owner=user,file_name=newfilename,file_path=newpath)
    if bool(filecheck):
        return False
    file1.file_path = newpath
    file1.file_name = newfilename
    file1.save()
    if path == '/':
        path = ''
    if newpath == '/':
        newpath = ''
    if file1.file_type == 'dir': 
        subpath = ''.join([path,'/',filename])
        files = FileInfo.objects.filter(owner=user,file_path=subpath)
        subpath2 = ''.join([newpath,'/',newfilename])
        for f in files:
            if f.file_type != 'dir':
                f.file_path = subpath2
                f.save()
                key = ''.join([username, subpath , '/', f.file_name])
                key2 = ''.join([username, subpath2, '/', f.file_name])
                ret, info = bucket.move(settings.QINIU_BUCKET_NAME, key, settings.QINIU_BUCKET_NAME, key2)
                print info
                # assert ret=={}
            else :
                move_file(username,subpath,f.file_name,subpath2,f.file_name,bucket)
    else:
        key = ''.join([username, path, '/', filename])
        key2 = ''.join([username, newpath, '/', newfilename])
        ret, info = bucket.move(settings.QINIU_BUCKET_NAME, key, settings.QINIU_BUCKET_NAME, key2)
        print info
        # assert ret == {}
    return True
开发者ID:wumengqiang,项目名称:limidrive,代码行数:57,代码来源:common.py


示例9: get

 def get(self, request):
     mediaid = request.query_params["mediaid"]
     filename = str(uuid.uuid1()).replace("-", "") + ".jpg"
     q = Auth(settings.QNACCESSKEY, settings.QNSECRETKEY)
     bucket = BucketManager(q)
     token = wxutils.get_accesstoken()
     mediaurl = "http://file.api.weixin.qq.com/cgi-bin/media/get?access_token=" + token + "&media_id=" + mediaid
     ret, info = bucket.fetch(mediaurl, settings.QNBUKET, filename)
     return Response({"pic": filename})
开发者ID:roynwang,项目名称:o2gymsvr,代码行数:9,代码来源:views.py


示例10: delete_files

 def delete_files(self,source_bucket,pathlist=[]):
     """Args:
     source_bucket: 'source_bucket'
     pathlist: ['source_file_name',...]
     """
     bucket = BucketManager(self.__auth)
     ops = build_batch_delete(source_bucket, pathlist)
     ret, info = bucket.batch(ops)
     return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:9,代码来源:easy_qiniu.py


示例11: get_file_info

 def get_file_info(self,bucket_name,keys=[]):
     """Args:
     bucket_name:'bucket_name'
     keys:  ['fileName1','fileName2']
     """
     bucket = BucketManager(self.__auth)
     ops = build_batch_stat(bucket_name, keys)
     ret, info = bucket.batch(ops)
     return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:9,代码来源:easy_qiniu.py


示例12: qiniu_file

def qiniu_file(pre_text, limit):
    q = qiniu_q()
    bucket = BucketManager(q)
    bucket_name = 'love-travel'
    ret, eof, info = bucket.list(bucket_name, prefix=pre_text, delimiter="/", limit=limit)
    if len(ret["items"]) > 0 :
        return {'result' : True, 'files' : ret["items"]}
    else:
        return {'result' : False}
开发者ID:liangxuCHEN,项目名称:CRM_systeme,代码行数:9,代码来源:tool.py


示例13: save_to_qiniu

def save_to_qiniu(url, store_name):
    """放到七牛云存储上
    """
    bucket_name = config.QINIU['bucket']
    g = Auth(config.QINIU['accesskey'], config.QINIU['secretkey'])
    bucket = BucketManager(g)
    ret, info = bucket.fetch(url, bucket_name, store_name)
    if ret:
        return False
    return config.QINIU['cdn'] + '/' + store_name
开发者ID:robertding,项目名称:fbooks,代码行数:10,代码来源:storage.py


示例14: move_files

 def move_files(self,source_bucket,target_bucket,pathdict={}):
     """Args:
     source_bucket: 'source_bucket'
     target_bucket:  'target_bucket'
     pathdict: {'source_file_name':'target_file_name',...}
     """
     bucket = BucketManager(self.__auth)
     ops = build_batch_move(source_bucket, pathdict, target_bucket)
     ret, info = bucket.batch(ops)
     return ret,info
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:10,代码来源:easy_qiniu.py


示例15: isFileExist

def isFileExist(file_name):
    q = getAuth()
    # check if file already exist
    bucket_name = os.environ['server']
    bucket = BucketManager(getAuth())
    ret, info = bucket.stat(bucket_name, file_name)
    if ret != None:
        return True
    else:
        return False
开发者ID:aptonic,项目名称:dropzone3-actions,代码行数:10,代码来源:action.py


示例16: bucket_list

def bucket_list():
    bm = BucketManager(qa())
    ret = bm.list('image', None, None, 10, None)
    if ret and ret[0] and ret[0].get('items') and len(ret[0].get('items')) > 0:
        imgs = ret[0].get('items')
        marker = ret[0].get('marker')
        print(marker)
        if imgs:
            for x in imgs:
                print('ll.append("%s")' % x['key'])
    print(ret)
开发者ID:tianshan20081,项目名称:dev,代码行数:11,代码来源:qn.py


示例17: upload_by_fetch

 def upload_by_fetch(self, remote_file_path, key):
     """
     通过远程url上传文件,但是可能报错,请使用upload()方法
     :param remote_file_path:
     :param key:
     :return:
     """
     bucket = BucketManager(self.auth)
     ret, info = bucket.fetch(remote_file_path, QINIU_DEFAULT_BUCKET, key)
     print('ret: %s' % ret)
     print('info: %s' % info)
开发者ID:allhu,项目名称:scrapy_in_practice,代码行数:11,代码来源:qiniu_cloud.py


示例18: save_image

def save_image(url, bookname):
    """抓取图片放到七牛云存储上
    """
    bucket_name = config.QINIU['bucket']
    g = Auth(config.QINIU['accesskey'], config.QINIU['secretkey'])
    bucket = BucketManager(g)
    save_to = 'IMAGE|{}'.format(bookname)
    ret, info = bucket.fetch(url, bucket_name, save_to)
    if ret:
        return False
    return config.QINIU['cdn'] + '/' + save_to
开发者ID:robertding,项目名称:fbooks,代码行数:11,代码来源:storage.py


示例19: multi_delete

    def multi_delete(self, key_list):
        # key_list: [key1, key2, key3, ...]
        from qiniu import build_batch_delete
        bucket = BucketManager(self.q)
        ops = build_batch_delete(self.bucket_name, key_list)
        ret, info = bucket.batch(ops)
        # print('QiniuStorage - multi_delete: %s' % info)

        json_info = json.loads(info.text_body)
        for m_info in json_info:
            # "code":612,"data":{"error":"no such file or directory"
            assert m_info[u'code'] == 200 or m_info[u'code'] == 612
开发者ID:carlo-z,项目名称:carloz-lib-web,代码行数:12,代码来源:qiniu_cloud.py


示例20: fetch_files_from_net_to_qiniu

 def fetch_files_from_net_to_qiniu(self,bucket_name,pathdict={}):
     """Args:
     bucket_name: 'bucket_name'
     pathdict: {'source_file_name':'target_file_name',...}
     """
     bucket = BucketManager(self.__auth)
     rets=[]
     infos=[]
     for p in pathdict.keys():
         ret, info = bucket.fetch(pathdict[p], bucket_name,p)
         rets.append(ret)
         infos.append(info)
     return rets,infos
开发者ID:Lsgo,项目名称:easy_qiniu,代码行数:13,代码来源:easy_qiniu.py



注:本文中的qiniu.BucketManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python config.get_default函数代码示例发布时间:2022-05-26
下一篇:
Python qiniu.Auth类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap