本文整理汇总了Python中qiniu.etag函数的典型用法代码示例。如果您正苦于以下问题:Python etag函数的具体用法?Python etag怎么用?Python etag使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了etag函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upload
def upload(self, upload_dir, upload_bucket):
if os.path.isdir(upload_dir):
if os.path.isdir(upload_dir):
up_file_list = []
for root, dirs, files in os.walk(upload_dir,topdown=True):
for v_file in files:
up_file_list.append(os.path.join(root, v_file))
for file_name in up_file_list:
token = self.mauth.upload_token(upload_bucket, file_name)
ret, info = put_file(token, file_name, file_name)
# print(info)
assert ret['key'] == file_name
assert ret['hash'] == etag(file_name)
print ret
elif os.path.isfile(upload_dir):
token = self.mauth.upload_token(upload_bucket, file_name)
ret, info = put_file(token, file_name, file_name)
assert ret['key'] == file_name
assert ret['hash'] == etag(file_name)
print ret
elif os.path.isfile(upload_dir):
file_name = upload_dir
token = self.mauth.upload_token(upload_bucket, file_name)
ret, info = put_file(token, file_name, file_name)
assert ret['key'] == file_name
assert ret['hash'] == etag(file_name)
print ret
开发者ID:Arvon2014,项目名称:arvon-scripts,代码行数:27,代码来源:7niu_upload_mange.py
示例2: upload
def upload(origin_file_path):
# 构建鉴权对象
q = Auth(config.access_key, config.secret_key)
# 要上传的空间
bucket_name = 'md-doc'
localfile = conwebp.convert(origin_file_path)
# 上传到七牛后保存的文件名
dest_prefix = time.strftime("%Y%m%d%H%M%S", time.localtime())
dest_name = dest_prefix + "_" + os.path.basename(localfile)
# 上传文件到七牛后, 七牛将文件名和文件大小回调给业务服务器。
policy = {
'callbackBody': 'filename=$(fname)&filesize=$(fsize)'
}
token = q.upload_token(bucket_name, dest_name, 3600, policy)
ret, info = put_file(token, dest_name, localfile)
if ret is not None:
print("Upload Success,url=", config.domin + dest_name)
else:
print("info=", info)
print("ret=", ret)
assert ret['key'] == dest_name
assert ret['hash'] == etag(localfile)
开发者ID:Songziyuan,项目名称:QiniuUpload,代码行数:27,代码来源:qiniuUpload.py
示例3: sync
def sync(conf, demon):
try:
data = load_config(conf)
except IOError:
raise
for k, v in data.items():
data[k] = v.encode('utf-8')
qc = qiniuClient(data['access_key'], data['secret_key'],
data['bucket_name'], data['domain'],
data['root'])
for item in qc.list():
path = os.path.join(data['root'], item['key'])
if (not os.path.exists(path)
or os.path.exists(path) and qiniu.etag(path) != item['hash']
):
if qc.down_file(item['key'], path):
msg = "下载{}成功".format(path)
else:
msg = "下载{}失败".format(path)
bubble_notify = pynotify.Notification("qiniuClund",
msg
)
bubble_notify.show()
if demon:
wm = pyinotify.WatchManager()
event_flags = (pyinotify.IN_CREATE
| pyinotify.IN_DELETE
| pyinotify.IN_MODIFY)
wm.add_watch(data['root'], event_flags, rec=True,
auto_add=True)
eh = qiniudEventHandler(qc)
notifier = pyinotify.Notifier(wm, eh)
notifier.loop()
开发者ID:uxlsl,项目名称:qiniubed,代码行数:34,代码来源:qiniubed.py
示例4: user_icon
def user_icon(request):
if request.method == 'GET':
return render(request, 'home/user/icon.html')
if request.method == 'POST':
file = request.FILES['avatar']
with open(BASE_DIR+'/static/images/icon.jpg', 'wb') as pic:
for c in file.chunks():
pic.write(c)
user = get_user(request)
# 要上传的空间
bucket_name = 'goodbuy'
# 构建鉴权对象
q = qiniu.Auth(ACCESS_KEY, QINIU_SECRET_KEY)
# 上传到七牛后保存的文件名
key = 'static/icon/'+user.username+'3.jpg'
# 生成上传 Token,可以指定过期时间等
token = q.upload_token(bucket_name, key)
localfile = BASE_DIR+'/static/images/icon.jpg'
ret, info = qiniu.put_file(token, key=key, file_path=localfile)
print(info, ret)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
user.icon = 'http://pbu0s2z3p.bkt.clouddn.com/'+key
user.save()
return JsonResponse({'key':key,'token':token})
开发者ID:SongJiaxin95,项目名称:GoodBuy,代码行数:25,代码来源:user_api.py
示例5: upload_file
def upload_file(localfile):
# 需要填写你的 Access Key 和 Secret Key
access_key = 'aRzVj18_VFA_p4EZ9z8ClWkVwYvAOWuoMStYnJhi'
secret_key = 'bh_hBotfph77wcmHHIgCwExqKEvQN_eyT5m1r9_c'
# 构建鉴权对象
q = Auth(access_key, secret_key)
# 要上传的空间
bucket_name = 'myselfres'
# 上传到七牛后保存的文件名
key = time.strftime("%Y-%m-%d-%H-%M-%S.png", time.localtime())
print key
# 生成上传 Token,可以指定过期时间等
token = q.upload_token(bucket_name, key, 3600)
# 要上传文件的本地路径
ret, info = put_file(token, key, localfile)
print (info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
pngOnlinePath = myselfres_dom + key + ')'
setText(pngOnlinePath)
开发者ID:axidaka,项目名称:use-opensource,代码行数:28,代码来源:upload_file.py
示例6: upload
def upload(args):
""" Upload images to qiniu cloud
"""
assert (args.bucket_name != None)
assert (args.remote_dir != None)
assert (args.input_file != None)
assert (args.local_dir != None)
bucket_name = args.bucket_name
# with open(args.input_file) as fh:
# paths = filter(None, fh.readlines())
img_num = 0
for video in os.listdir(args.input_file):
img_num += 1
with open(os.path.join(args.input_file, video)) as fh:
paths = filter(None, fh.readlines())
mime_type = "image/jpg"
for path in paths:
key = (os.path.join(args.remote_dir, path)).strip('\n')
print 'upload ', path, ' as ', key
# path = path.split()[0]
localfile = (os.path.join(args.local_dir, '%s' % (path))).strip('\n')
print localfile
token = q.upload_token(bucket_name, key)
ret, info = put_file(token, key, localfile, mime_type=mime_type, check_crc=True)
assert info.status_code == 200
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
print '%d images uploaded' % (img_num)
开发者ID:moushuai,项目名称:FlaskWebTool,代码行数:34,代码来源:tupu.py
示例7: putfile
def putfile(file_url):
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
mime_type, file_type = get_mime_type(file_url)
key = etag(file_url) + "." + file_type # "test"
token = q.upload_token(bucket_name, key)
ret, info = put_file(token, key, file_url, mime_type=mime_type, check_crc=True)
print ret
开发者ID:besky86,项目名称:source,代码行数:8,代码来源:test_qiniu2.py
示例8: test_putfile
def test_putfile(self):
localfile = __file__
key = 'test_file'
token = self.q.upload_token(bucket_name, key)
ret = putfile(token, key, localfile, mime_type=self.mime_type, check_crc=True)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
开发者ID:atomd,项目名称:qiniu-python,代码行数:8,代码来源:test_qiniu.py
示例9: upload
def upload(self, src, dest, check=True):
token = self.q.upload_token(self.bucket_name, dest, self.expire)
ret, info = put_file(token, dest, src)
if check:
assert ret['key'] == dest, "ret['key'] <{}> unmatch dest <{}>".format(ret['key'], dest)
assert ret['hash'] == etag(src), "ret['hash'] <{}> unmatch etag(src), <{}>".format(ret['hash'], etag(src))
return True
开发者ID:greatwhole,项目名称:dock_hx,代码行数:8,代码来源:qiniu_upload.py
示例10: sync2qiniu
def sync2qiniu(filename, bkt, key, secret):
import qiniu
q = qiniu.Auth(key, secret)
token = q.upload_token(bkt, filename, 3600)
print('Send %s to qiniu ...' % filename)
ret, info = qiniu.put_file(token, filename, filename)
if (ret['key'] == filename) and (ret['hash'] == qiniu.etag(filename)):
print('upload successful')
开发者ID:leetschau,项目名称:oss-manager,代码行数:8,代码来源:ali-bak.py
示例11: test_retry
def test_retry(self):
localfile = __file__
key = 'test_file_r_retry'
qiniu.set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com'))
token = self.q.upload_token(bucket_name, key)
ret, info = put_file(token, key, localfile, self.params, self.mime_type)
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
开发者ID:jemygraw,项目名称:python-sdk,代码行数:9,代码来源:test_qiniu.py
示例12: test_putfile
def test_putfile(self):
localfile = __file__
key = "test_file"
token = self.q.upload_token(bucket_name, key)
ret, info = put_file(token, key, localfile, mime_type=self.mime_type, check_crc=True)
print(info)
assert ret["key"] == key
assert ret["hash"] == etag(localfile)
开发者ID:qiniu,项目名称:python-sdk,代码行数:9,代码来源:test_qiniu.py
示例13: test_retry
def test_retry(self):
localfile = __file__
key = 'test_file_r_retry'
qiniu.set_default(default_up_host='a')
token = self.q.upload_token(bucket_name, key)
ret, info = put_file(token, key, localfile, self.params, self.mime_type)
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
qiniu.set_default(default_up_host=qiniu.config.UPAUTO_HOST)
开发者ID:Euclidvi31,项目名称:python-sdk,代码行数:10,代码来源:test_qiniu.py
示例14: upload_file
def upload_file(self, key, localfile):
# key = 'home/carlo/test_file.py'
mime_type = "text/plain"
params = {'x:a': 'a'}
token = self.q.upload_token(self.bucket_name, key)
ret, info = put_file(token, key, localfile, mime_type=mime_type, check_crc=True)
# print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
开发者ID:carlo-z,项目名称:carloz-lib-web,代码行数:10,代码来源:qiniu_cloud.py
示例15: upload_file
def upload_file(upload_file_name, temp):
key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest()
mime_type = 'image/png'
token = q.upload_token(bucket, key)
ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True)
print 'upload qiniu result:', info
assert ret['key'] == key
assert ret['hash'] == etag(upload_file_name)
os.rename(upload_file_name, upload_file_name+'.old')
return domain+'/'+key
开发者ID:xiaochao,项目名称:cut,代码行数:10,代码来源:main.py
示例16: upload
def upload(self, key, localfile = None, expires = 3600, deleteAfterDays = 7):
policy = {'deleteAfterDays': deleteAfterDays}
print policy
#token = self.q.upload_token(self.bucket_name, key, expires, policy, False)
token = self.q.upload_token(self.bucket_name, key, expires, policy)
ret, info = put_file(token, key, localfile)
print(info)
assert ret['key'] == key
assert ret['hash'] == etag(localfile)
开发者ID:voidcc,项目名称:qiniu_test,代码行数:10,代码来源:qiniu_test.py
示例17: process_IN_MODIFY
def process_IN_MODIFY(self, event):
click.echo('MODIFY event:%s' % event.pathname)
key = self._qiniu_client.cal_key(event.pathname)
stat = self._qiniu_client.stat(key)
if stat is not None and stat['hash'] != qiniu.etag(event.pathname):
self._qiniu_client.upload_file(event.pathname)
bubble_notify = pynotify.Notification(
"qiniubed", "上传{}成功".format(event.pathname))
bubble_notify.show()
url = self._qiniu_client.get_chain(event.pathname)
pyperclip.copy(url)
开发者ID:uxlsl,项目名称:qiniubed,代码行数:11,代码来源:qiniubed.py
示例18: upload
def upload(local_file, remote_name, ttl=3600):
print(local_file, remote_name, ttl)
#构建鉴权对象
q = Auth(ACCESS_KEY, SECRET_KEY)
#生成上传 Token,可以指定过期时间等
token = q.upload_token(BUCKET_NAME, remote_name, ttl)
ret, info = put_file(token, remote_name, local_file, progress_handler=progress_handler)
print(info)
assert ret['key'] == remote_name
assert ret['hash'] == etag(local_file)
开发者ID:MyonKeminta,项目名称:docs,代码行数:12,代码来源:upload.py
示例19: upload_file
def upload_file(self, filepath, file_key=None, mime_type=None):
if file_key is None:
file_key = path.basename(filepath)
token = self._q.upload_token(self.bucket_name)
ret, info = put_file(token, file_key, filepath, mime_type=mime_type, check_crc=True)
#file_exists
if ret and info.status_code == 614:
return self.get_file_url(file_key)
#upload success
assert ret['key'] == file_key
assert ret['hash'] == etag(filepath)
return self.get_file_url(file_key)
开发者ID:2php,项目名称:CoolCantonese_old,代码行数:12,代码来源:qiniu_storage.py
示例20: down
def down(token, key, localfile, mime_type, delete=False):
ret, info = qiniu.put_file(
token, key, localfile,
mime_type=mime_type,
check_crc=True
)
assert ret['key'] == key
assert ret['hash'] == qiniu.etag(localfile)
print(localfile)
if delete:
os.remove(localfile)
开发者ID:BiaoLiu,项目名称:videoSpider,代码行数:12,代码来源:upload_qiniu.py
注:本文中的qiniu.etag函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论