• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python s3_test_client.Connection类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift3.test.functional.s3_test_client.Connection的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_put_object_copy_error

    def test_put_object_copy_error(self):
        obj = 'object'
        self.conn.make_request('PUT', self.bucket, obj)
        dst_bucket = 'dst-bucket'
        self.conn.make_request('PUT', dst_bucket)
        dst_obj = 'dst_object'

        headers = {'x-amz-copy-source': '/%s/%s' % (self.bucket, obj)}
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('PUT', dst_bucket, dst_obj, headers)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        # /src/nothing -> /dst/dst
        headers = {'X-Amz-Copy-Source': '/%s/%s' % (self.bucket, 'nothing')}
        status, headers, body = \
            self.conn.make_request('PUT', dst_bucket, dst_obj, headers)
        self.assertEquals(get_error_code(body), 'NoSuchKey')

        # /nothing/src -> /dst/dst
        headers = {'X-Amz-Copy-Source': '/%s/%s' % ('nothing', obj)}
        status, headers, body = \
            self.conn.make_request('PUT', dst_bucket, dst_obj, headers)
        # TODO: source bucket is not check.
        # self.assertEquals(get_error_code(body), 'NoSuchBucket')

        # /src/src -> /nothing/dst
        headers = {'X-Amz-Copy-Source': '/%s/%s' % (self.bucket, obj)}
        status, headers, body = \
            self.conn.make_request('PUT', 'nothing', dst_obj, headers)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')
开发者ID:tumf,项目名称:swift3,代码行数:31,代码来源:test_object.py


示例2: Swift3FunctionalTestCase

class Swift3FunctionalTestCase(unittest.TestCase):
    def __init__(self, method_name):
        super(Swift3FunctionalTestCase, self).__init__(method_name)
        self.method_name = method_name

    def setUp(self):
        try:
            self.conn = Connection()
            self.conn.reset()
        except Exception:
            message = '%s got an error during initialize process.\n\n%s' % \
                      (self.method_name, traceback.format_exc())
            # TODO: Find a way to make this go to FAIL instead of Error
            self.fail(message)

    def assertCommonResponseHeaders(self, headers, etag=None):
        """
        asserting common response headers with args
        :param headers: a dict of response headers
        :param etag: a string of md5(content).hexdigest() if not given,
                     this won't assert anything about etag. (e.g. DELETE obj)
        """
        self.assertTrue(headers['x-amz-id-2'] is not None)
        self.assertTrue(headers['x-amz-request-id'] is not None)
        self.assertTrue(headers['date'] is not None)
        # TODO; requires consideration
        # self.assertTrue(headers['server'] is not None)
        if etag is not None:
            self.assertTrue('etag' in headers)  # sanity
            self.assertEquals(etag, headers['etag'].strip('"'))
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:30,代码来源:__init__.py


示例3: test_abort_multi_upload_error

    def test_abort_multi_upload_error(self):
        bucket = 'bucket'
        self.conn.make_request('PUT', bucket)
        key = 'obj'
        query = 'uploads'
        status, headers, body = \
            self.conn.make_request('POST', bucket, key, query=query)
        elem = fromstring(body, 'InitiateMultipartUploadResult')
        upload_id = elem.find('UploadId').text
        self._upload_part(bucket, key, upload_id)

        query = 'uploadId=%s' % upload_id
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('DELETE', bucket, key, query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('DELETE', 'nothing', key, query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        query = 'uploadId=%s' % 'nothing'
        status, headers, body = \
            self.conn.make_request('DELETE', bucket, key, query=query)
        self.assertEquals(get_error_code(body), 'NoSuchUpload')
开发者ID:erwasambo,项目名称:swift3,代码行数:25,代码来源:test_multi_upload.py


示例4: test_upload_part_error

    def test_upload_part_error(self):
        bucket = 'bucket'
        self.conn.make_request('PUT', bucket)
        query = 'uploads'
        key = 'obj'
        status, headers, body = \
            self.conn.make_request('POST', bucket, key, query=query)
        elem = fromstring(body, 'InitiateMultipartUploadResult')
        upload_id = elem.find('UploadId').text

        query = 'partNumber=%s&uploadId=%s' % (1, upload_id)
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('PUT', bucket, key, query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('PUT', 'nothing', key, query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        query = 'partNumber=%s&uploadId=%s' % (1, 'nothing')
        status, headers, body = \
            self.conn.make_request('PUT', bucket, key, query=query)
        self.assertEquals(get_error_code(body), 'NoSuchUpload')

        query = 'partNumber=%s&uploadId=%s' % (0, upload_id)
        status, headers, body = \
            self.conn.make_request('PUT', bucket, key, query=query)
        self.assertEquals(get_error_code(body), 'InvalidArgument')
        err_msg = 'Part number must be an integer between 1 and'
        self.assertTrue(err_msg in get_error_msg(body))
开发者ID:erwasambo,项目名称:swift3,代码行数:31,代码来源:test_multi_upload.py


示例5: test_put_object_error

    def test_put_object_error(self):
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('PUT', self.bucket, 'object')
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('PUT', 'bucket2', 'object')
        self.assertEquals(get_error_code(body), 'NoSuchBucket')
开发者ID:tumf,项目名称:swift3,代码行数:9,代码来源:test_object.py


示例6: test_delete_bucket_error

    def test_delete_bucket_error(self):
        status, headers, body = \
            self.conn.make_request('DELETE', 'bucket+invalid')
        self.assertEquals(get_error_code(body), 'InvalidBucketName')

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('DELETE', 'bucket')
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = self.conn.make_request('DELETE', 'bucket')
        self.assertEquals(get_error_code(body), 'NoSuchBucket')
开发者ID:sivakrishnan47,项目名称:swift3,代码行数:12,代码来源:test_bucket.py


示例7: test_put_bucket_error

    def test_put_bucket_error(self):
        status, headers, body = \
            self.conn.make_request('PUT', 'bucket+invalid')
        self.assertEquals(get_error_code(body), 'InvalidBucketName')

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = auth_error_conn.make_request('PUT', 'bucket')
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        self.conn.make_request('PUT', 'bucket')
        status, headers, body = self.conn.make_request('PUT', 'bucket')
        self.assertEquals(get_error_code(body), 'BucketAlreadyExists')
开发者ID:sivakrishnan47,项目名称:swift3,代码行数:12,代码来源:test_bucket.py


示例8: test_get_bucket_error

    def test_get_bucket_error(self):
        self.conn.make_request('PUT', 'bucket')

        status, headers, body = \
            self.conn.make_request('GET', 'bucket+invalid')
        self.assertEqual(get_error_code(body), 'InvalidBucketName')

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = auth_error_conn.make_request('GET', 'bucket')
        self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = self.conn.make_request('GET', 'nothing')
        self.assertEqual(get_error_code(body), 'NoSuchBucket')
开发者ID:swiftstack,项目名称:swift3-stackforge,代码行数:13,代码来源:test_bucket.py


示例9: test_get_bucket_acl_error

    def test_get_bucket_acl_error(self):
        aws_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            aws_error_conn.make_request('GET', self.bucket, query='acl')
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('GET', 'nothing', query='acl')
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        status, headers, body = \
            self.conn2.make_request('GET', self.bucket, query='acl')
        self.assertEquals(get_error_code(body), 'AccessDenied')
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:13,代码来源:test_acl.py


示例10: test_list_multi_uploads_error

    def test_list_multi_uploads_error(self):
        bucket = 'bucket'
        self.conn.make_request('PUT', bucket)
        query = 'uploads'

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('GET', bucket, query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('GET', 'nothing', query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')
开发者ID:erwasambo,项目名称:swift3,代码行数:13,代码来源:test_multi_upload.py


示例11: test_head_bucket_error

    def test_head_bucket_error(self):
        self.conn.make_request('PUT', 'bucket')

        status, headers, body = \
            self.conn.make_request('HEAD', 'bucket+invalid')
        self.assertEquals(status, 400)

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('HEAD', 'bucket')
        self.assertEquals(status, 403)

        status, headers, body = self.conn.make_request('HEAD', 'nothing')
        self.assertEquals(status, 404)
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:14,代码来源:test_bucket.py


示例12: test_head_object_error

    def test_head_object_error(self):
        obj = 'object'
        self.conn.make_request('PUT', self.bucket, obj)

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('HEAD', self.bucket, obj)
        self.assertEquals(status, 403)

        status, headers, body = \
            self.conn.make_request('HEAD', self.bucket, 'invalid')
        self.assertEquals(status, 404)

        status, headers, body = \
            self.conn.make_request('HEAD', 'invalid', obj)
        self.assertEquals(status, 404)
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:16,代码来源:test_object.py


示例13: test_delete_object_error

    def test_delete_object_error(self):
        obj = 'object'
        self.conn.make_request('PUT', self.bucket, obj)

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('DELETE', self.bucket, obj)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('DELETE', self.bucket, 'invalid')
        self.assertEquals(get_error_code(body), 'NoSuchKey')

        status, headers, body = \
            self.conn.make_request('DELETE', 'invalid', obj)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')
开发者ID:tumf,项目名称:swift3,代码行数:16,代码来源:test_object.py


示例14: test_put_bucket_acl_error

    def test_put_bucket_acl_error(self):
        req_headers = {'x-amz-acl': 'public-read'}
        aws_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            aws_error_conn.make_request('PUT', self.bucket,
                                        headers=req_headers, query='acl')
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('PUT', 'nothing',
                                   headers=req_headers, query='acl')
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        status, headers, body = \
            self.conn2.make_request('PUT', self.bucket,
                                    headers=req_headers, query='acl')
        self.assertEquals(get_error_code(body), 'AccessDenied')
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:17,代码来源:test_acl.py


示例15: test_upload_part_copy_error

    def test_upload_part_copy_error(self):
        src_bucket = 'src'
        src_obj = 'src'
        self.conn.make_request('PUT', src_bucket)
        self.conn.make_request('PUT', src_bucket, src_obj)
        src_path = '%s/%s' % (src_bucket, src_obj)

        bucket = 'bucket'
        self.conn.make_request('PUT', bucket)
        key = 'obj'
        query = 'uploads'
        status, headers, body = \
            self.conn.make_request('POST', bucket, key, query=query)
        elem = fromstring(body, 'InitiateMultipartUploadResult')
        upload_id = elem.find('UploadId').text

        query = 'partNumber=%s&uploadId=%s' % (1, upload_id)
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('PUT', bucket, key,
                                         headers={
                                             'X-Amz-Copy-Source': src_path
                                         },
                                         query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('PUT', 'nothing', key,
                                   headers={'X-Amz-Copy-Source': src_path},
                                   query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        query = 'partNumber=%s&uploadId=%s' % (1, 'nothing')
        status, headers, body = \
            self.conn.make_request('PUT', bucket, key,
                                   headers={'X-Amz-Copy-Source': src_path},
                                   query=query)
        self.assertEquals(get_error_code(body), 'NoSuchUpload')

        src_path = '%s/%s' % (src_bucket, 'nothing')
        query = 'partNumber=%s&uploadId=%s' % (1, upload_id)
        status, headers, body = \
            self.conn.make_request('PUT', bucket, key,
                                   headers={'X-Amz-Copy-Source': src_path},
                                   query=query)
        self.assertEquals(get_error_code(body), 'NoSuchKey')
开发者ID:erwasambo,项目名称:swift3,代码行数:46,代码来源:test_multi_upload.py


示例16: setUp

 def setUp(self):
     try:
         self.conn = Connection()
         self.conn.reset()
     except Exception:
         message = '%s got an error during initialize process.\n\n%s' % \
                   (self.method_name, traceback.format_exc())
         # TODO: Find a way to make this go to FAIL instead of Error
         self.fail(message)
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:9,代码来源:__init__.py


示例17: test_get_object_error

    def test_get_object_error(self):
        obj = 'object'
        self.conn.make_request('PUT', self.bucket, obj)

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('GET', self.bucket, obj)
        self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
        self.assertEqual(headers['content-type'], 'application/xml')

        status, headers, body = \
            self.conn.make_request('GET', self.bucket, 'invalid')
        self.assertEqual(get_error_code(body), 'NoSuchKey')
        self.assertEqual(headers['content-type'], 'application/xml')

        status, headers, body = self.conn.make_request('GET', 'invalid', obj)
        self.assertEqual(get_error_code(body), 'NoSuchBucket')
        self.assertEqual(headers['content-type'], 'application/xml')
开发者ID:swiftstack,项目名称:swift3-stackforge,代码行数:18,代码来源:test_object.py


示例18: test_head_object_error

    def test_head_object_error(self):
        obj = 'object'
        self.conn.make_request('PUT', self.bucket, obj)

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('HEAD', self.bucket, obj)
        self.assertEquals(status, 403)
        self.assertEquals(body, '')  # sanifty
        self.assertEquals(headers['content-type'], 'application/xml')

        status, headers, body = \
            self.conn.make_request('HEAD', self.bucket, 'invalid')
        self.assertEquals(status, 404)
        self.assertEquals(body, '')  # sanifty
        self.assertEquals(headers['content-type'], 'application/xml')

        status, headers, body = \
            self.conn.make_request('HEAD', 'invalid', obj)
        self.assertEquals(status, 404)
        self.assertEquals(body, '')  # sanifty
        self.assertEquals(headers['content-type'], 'application/xml')
开发者ID:pkdevboxy,项目名称:swift3,代码行数:22,代码来源:test_object.py


示例19: test_delete_multi_objects_error

    def test_delete_multi_objects_error(self):
        bucket = 'bucket'
        put_objects = ['obj']
        self._prepare_test_delete_multi_objects(bucket, put_objects)
        xml = self._gen_multi_delete_xml(put_objects)
        content_md5 = calculate_md5(xml)
        query = 'delete'

        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('POST', bucket, body=xml,
                                         headers={
                                             'Content-MD5': content_md5
                                         },
                                         query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('POST', 'nothing', body=xml,
                                   headers={'Content-MD5': content_md5},
                                   query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        # without Object tag
        xml = self._gen_invalid_multi_delete_xml()
        content_md5 = calculate_md5(xml)
        status, headers, body = \
            self.conn.make_request('POST', bucket, body=xml,
                                   headers={'Content-MD5': content_md5},
                                   query=query)
        self.assertEquals(get_error_code(body), 'MalformedXML')

        # without value of Key tag
        xml = self._gen_invalid_multi_delete_xml(hasObjectTag=True)
        content_md5 = calculate_md5(xml)
        status, headers, body = \
            self.conn.make_request('POST', bucket, body=xml,
                                   headers={'Content-MD5': content_md5},
                                   query=query)
        self.assertEquals(get_error_code(body), 'UserKeyMustBeSpecified')

        # specified number of objects are over CONF.max_multi_delete_objects
        # (Default 1000), but xml size is smaller than 61365 bytes.
        req_objects = ['obj%s' for var in xrange(1001)]
        xml = self._gen_multi_delete_xml(req_objects)
        self.assertTrue(len(xml.encode('utf-8')) <= MAX_MULTI_DELETE_BODY_SIZE)
        content_md5 = calculate_md5(xml)
        status, headers, body = \
            self.conn.make_request('POST', bucket, body=xml,
                                   headers={'Content-MD5': content_md5},
                                   query=query)
        self.assertEquals(get_error_code(body), 'MalformedXML')

        # specified xml size is over 61365 bytes, but number of objects are
        # smaller than CONF.max_multi_delete_objects.
        obj = 'a' * 1024
        req_objects = [obj + str(var) for var in xrange(999)]
        xml = self._gen_multi_delete_xml(req_objects)
        self.assertTrue(len(xml.encode('utf-8')) > MAX_MULTI_DELETE_BODY_SIZE)
        content_md5 = calculate_md5(xml)
        status, headers, body = \
            self.conn.make_request('POST', bucket, body=xml,
                                   headers={'Content-MD5': content_md5},
                                   query=query)
        self.assertEquals(get_error_code(body), 'MalformedXML')
开发者ID:KoreaCloudObjectStorage,项目名称:swift3,代码行数:65,代码来源:test_multi_delete.py


示例20: test_complete_multi_upload_error

    def test_complete_multi_upload_error(self):
        bucket = 'bucket'
        keys = ['obj', 'obj2']
        self.conn.make_request('PUT', bucket)
        query = 'uploads'
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[0], query=query)
        elem = fromstring(body, 'InitiateMultipartUploadResult')
        upload_id = elem.find('UploadId').text

        etags = []
        for i in xrange(1, 3):
            query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
            status, headers, body = \
                self.conn.make_request('PUT', bucket, keys[0], query=query)
            etags.append(headers['etag'])
        xml = self._gen_comp_xml(etags)

        query = 'uploadId=%s' % upload_id
        auth_error_conn = Connection(aws_secret_key='invalid')
        status, headers, body = \
            auth_error_conn.make_request('POST', bucket, keys[0], body=xml,
                                         query=query)
        self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')

        status, headers, body = \
            self.conn.make_request('POST', 'nothing', keys[0], query=query)
        self.assertEquals(get_error_code(body), 'NoSuchBucket')

        query = 'uploadId=%s' % 'nothing'
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[0], body=xml,
                                   query=query)
        self.assertEquals(get_error_code(body), 'NoSuchUpload')

        # without Part tag in xml
        query = 'uploadId=%s' % upload_id
        xml = self._gen_comp_xml([])
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[0], body=xml,
                                   query=query)
        self.assertEquals(get_error_code(body), 'MalformedXML')

        # with ivalid etag in xml
        invalid_etag = 'invalid'
        xml = self._gen_comp_xml([invalid_etag])
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[0], body=xml,
                                   query=query)
        self.assertEquals(get_error_code(body), 'InvalidPart')

        # without part in Swift
        query = 'uploads'
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[1], query=query)
        elem = fromstring(body, 'InitiateMultipartUploadResult')
        upload_id = elem.find('UploadId').text
        query = 'uploadId=%s' % upload_id
        xml = self._gen_comp_xml([etags[0]])
        status, headers, body = \
            self.conn.make_request('POST', bucket, keys[1], body=xml,
                                   query=query)
        self.assertEquals(get_error_code(body), 'InvalidPart')
开发者ID:erwasambo,项目名称:swift3,代码行数:63,代码来源:test_multi_upload.py



注:本文中的swift3.test.functional.s3_test_client.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.get_error_code函数代码示例发布时间:2022-05-27
下一篇:
Python subresource.encode_acl函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap