• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python swift3.filter_factory函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.common.middleware.swift3.filter_factory函数的典型用法代码示例。如果您正苦于以下问题:Python filter_factory函数的具体用法?Python filter_factory怎么用?Python filter_factory使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了filter_factory函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_object_PUT_headers

    def test_object_PUT_headers(self):
        class FakeApp(object):
            def __call__(self, env, start_response):
                self.req = Request(env)
                start_response("200 OK")
                start_response([])

        app = FakeApp()
        local_app = swift3.filter_factory({})(app)
        req = Request.blank(
            "/bucket/object",
            environ={"REQUEST_METHOD": "PUT"},
            headers={
                "Authorization": "AWS test:tester:hmac",
                "X-Amz-Storage-Class": "REDUCED_REDUNDANCY",
                "X-Amz-Meta-Something": "oh hai",
                "X-Amz-Copy-Source": "/some/source",
                "Content-MD5": "ffoHqOWd280dyE1MT4KuoQ==",
            },
        )
        req.date = datetime.now()
        req.content_type = "text/plain"
        resp = local_app(req.environ, lambda *args: None)
        self.assertEquals(app.req.headers["ETag"], "7dfa07a8e59ddbcd1dc84d4c4f82aea1")
        self.assertEquals(app.req.headers["X-Object-Meta-Something"], "oh hai")
        self.assertEquals(app.req.headers["X-Copy-From"], "/some/source")
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:26,代码来源:test_swift3.py


示例2: test_object_DELETE

 def test_object_DELETE(self):
     local_app = swift3.filter_factory({})(FakeAppObject(204))
     req = Request.blank(
         "/bucket/object", environ={"REQUEST_METHOD": "DELETE"}, headers={"Authorization": "AWS test:tester:hmac"}
     )
     resp = local_app(req.environ, local_app.app.do_start_response)
     self.assertEquals(local_app.app.response_args[0].split()[0], "204")
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:7,代码来源:test_swift3.py


示例3: test_bucket_GET_max_keys

    def test_bucket_GET_max_keys(self):
        class FakeApp(object):
            def __call__(self, env, start_response):
                self.query_string = env['QUERY_STRING']
                start_response('200 OK', [])
                return '[]'
        fake_app = FakeApp()
        local_app = swift3.filter_factory({})(fake_app)
        bucket_name = 'junk'

        req = Request.blank('/%s' % bucket_name,
                environ={'REQUEST_METHOD': 'GET',
                         'QUERY_STRING': 'max-keys=5'},
                headers={'Authorization': 'AWS test:tester:hmac'})
        resp = local_app(req.environ, lambda *args: None)
        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
                childNodes[0].nodeValue, '5')
        args = dict(cgi.parse_qsl(fake_app.query_string))
        self.assert_(args['limit'] == '6')

        req = Request.blank('/%s' % bucket_name,
                environ={'REQUEST_METHOD': 'GET',
                         'QUERY_STRING': 'max-keys=5000'},
                headers={'Authorization': 'AWS test:tester:hmac'})
        resp = local_app(req.environ, lambda *args: None)
        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
                childNodes[0].nodeValue, '1000')
        args = dict(cgi.parse_qsl(fake_app.query_string))
        self.assertEquals(args['limit'], '1001')
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:31,代码来源:test_swift3.py


示例4: test_bucket_GET

    def test_bucket_GET(self):
        local_app = swift3.filter_factory({})(FakeAppBucket())
        bucket_name = 'junk'
        req = Request.blank('/%s' % bucket_name,
                            environ={'REQUEST_METHOD': 'GET'},
                            headers={'Authorization': 'AWS test:tester:hmac'})
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], '200')

        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.firstChild.nodeName, 'ListBucketResult')
        name = dom.getElementsByTagName('Name')[0].childNodes[0].nodeValue
        self.assertEquals(name, bucket_name)

        objects = [n for n in dom.getElementsByTagName('Contents')]
        listing = [n for n in objects[0].childNodes if n.nodeName != '#text']

        names = []
        for o in objects:
            if o.childNodes[0].nodeName == 'Key':
                names.append(o.childNodes[0].childNodes[0].nodeValue)
            if o.childNodes[1].nodeName == 'LastModified':
                self.assertTrue(
                    o.childNodes[1].childNodes[0].nodeValue.endswith('Z'))

        self.assertEquals(len(names), len(FakeAppBucket().objects))
        for i in FakeAppBucket().objects:
            self.assertTrue(i[0] in names)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:28,代码来源:test_swift3.py


示例5: test_bucket_GET_passthroughs

 def test_bucket_GET_passthroughs(self):
     class FakeApp(object):
         def __call__(self, env, start_response):
             self.query_string = env['QUERY_STRING']
             start_response('200 OK', [])
             return '[]'
     fake_app = FakeApp()
     local_app = swift3.filter_factory({})(fake_app)
     bucket_name = 'junk'
     req = Request.blank('/%s' % bucket_name,
             environ={'REQUEST_METHOD': 'GET', 'QUERY_STRING':
                      'delimiter=a&marker=b&prefix=c'},
             headers={'Authorization': 'AWS test:tester:hmac'})
     resp = local_app(req.environ, lambda *args: None)
     dom = xml.dom.minidom.parseString("".join(resp))
     self.assertEquals(dom.getElementsByTagName('Prefix')[0].
             childNodes[0].nodeValue, 'c')
     self.assertEquals(dom.getElementsByTagName('Marker')[0].
             childNodes[0].nodeValue, 'b')
     self.assertEquals(dom.getElementsByTagName('Delimiter')[0].
             childNodes[0].nodeValue, 'a')
     args = dict(cgi.parse_qsl(fake_app.query_string))
     self.assertEquals(args['delimiter'], 'a')
     self.assertEquals(args['marker'], 'b')
     self.assertEquals(args['prefix'], 'c')
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:25,代码来源:test_swift3.py


示例6: test_object_DELETE

 def test_object_DELETE(self):
     local_app = swift3.filter_factory({})(FakeAppObject(204))
     req = Request.blank('/bucket/object',
                         environ={'REQUEST_METHOD': 'DELETE'},
                         headers={'Authorization': 'AWS test:tester:hmac'})
     resp = local_app(req.environ, local_app.app.do_start_response)
     self.assertEquals(local_app.app.response_args[0].split()[0], '204')
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:7,代码来源:test_swift3.py


示例7: test_bucket_GET_max_keys

    def test_bucket_GET_max_keys(self):
        class FakeApp(object):
            def __call__(self, env, start_response):
                self.query_string = env["QUERY_STRING"]
                start_response("200 OK", [])
                return "[]"

        fake_app = FakeApp()
        local_app = swift3.filter_factory({})(fake_app)
        bucket_name = "junk"

        req = Request.blank(
            "/%s" % bucket_name,
            environ={"REQUEST_METHOD": "GET", "QUERY_STRING": "max-keys=5"},
            headers={"Authorization": "AWS test:tester:hmac"},
        )
        resp = local_app(req.environ, lambda *args: None)
        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.getElementsByTagName("MaxKeys")[0].childNodes[0].nodeValue, "5")
        args = dict(cgi.parse_qsl(fake_app.query_string))
        self.assert_(args["limit"] == "6")

        req = Request.blank(
            "/%s" % bucket_name,
            environ={"REQUEST_METHOD": "GET", "QUERY_STRING": "max-keys=5000"},
            headers={"Authorization": "AWS test:tester:hmac"},
        )
        resp = local_app(req.environ, lambda *args: None)
        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.getElementsByTagName("MaxKeys")[0].childNodes[0].nodeValue, "1000")
        args = dict(cgi.parse_qsl(fake_app.query_string))
        self.assertEquals(args["limit"], "1001")
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:32,代码来源:test_swift3.py


示例8: test_bucket_GET

    def test_bucket_GET(self):
        local_app = swift3.filter_factory({})(FakeAppBucket())
        bucket_name = "junk"
        req = Request.blank(
            "/%s" % bucket_name, environ={"REQUEST_METHOD": "GET"}, headers={"Authorization": "AWS test:tester:hmac"}
        )
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], "200")

        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.firstChild.nodeName, "ListBucketResult")
        name = dom.getElementsByTagName("Name")[0].childNodes[0].nodeValue
        self.assertEquals(name, bucket_name)

        objects = [n for n in dom.getElementsByTagName("Contents")]
        listing = [n for n in objects[0].childNodes if n.nodeName != "#text"]

        names = []
        for o in objects:
            if o.childNodes[0].nodeName == "Key":
                names.append(o.childNodes[0].childNodes[0].nodeValue)
            if o.childNodes[1].nodeName == "LastModified":
                self.assertTrue(o.childNodes[1].childNodes[0].nodeValue.endswith("Z"))

        self.assertEquals(len(names), len(FakeAppBucket().objects))
        for i in FakeAppBucket().objects:
            self.assertTrue(i[0] in names)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:27,代码来源:test_swift3.py


示例9: test_object_acl_GET

 def test_object_acl_GET(self):
     local_app = swift3.filter_factory({})(FakeAppObject())
     req = Request.blank('/bucket/object?acl',
                         environ={'REQUEST_METHOD': 'GET'},
                         headers={'Authorization': 'AWS test:tester:hmac'})
     resp = local_app(req.environ, local_app.app.do_start_response)
     self._check_acl('test:tester', resp)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:7,代码来源:test_swift3.py


示例10: _test_method_error

 def _test_method_error(self, cl, method, path, status):
     local_app = swift3.filter_factory({})(cl(status))
     req = Request.blank(path, environ={"REQUEST_METHOD": method}, headers={"Authorization": "AWS test:tester:hmac"})
     resp = local_app(req.environ, start_response)
     dom = xml.dom.minidom.parseString("".join(resp))
     self.assertEquals(dom.firstChild.nodeName, "Error")
     return dom.getElementsByTagName("Code")[0].childNodes[0].nodeValue
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:7,代码来源:test_swift3.py


示例11: test_object_acl_GET

 def test_object_acl_GET(self):
     local_app = swift3.filter_factory({})(FakeAppObject())
     req = Request.blank(
         "/bucket/object?acl", environ={"REQUEST_METHOD": "GET"}, headers={"Authorization": "AWS test:tester:hmac"}
     )
     resp = local_app(req.environ, local_app.app.do_start_response)
     self._check_acl("test:tester", resp)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:7,代码来源:test_swift3.py


示例12: test_bucket_acl_GET

 def test_bucket_acl_GET(self):
     local_app = swift3.filter_factory({})(FakeAppBucket())
     bucket_name = "junk"
     req = Request.blank(
         "/%s?acl" % bucket_name,
         environ={"REQUEST_METHOD": "GET"},
         headers={"Authorization": "AWS test:tester:hmac"},
     )
     resp = local_app(req.environ, local_app.app.do_start_response)
     self._check_acl("test:tester", resp)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:10,代码来源:test_swift3.py


示例13: test_object_GET_Range

    def test_object_GET_Range(self):
        local_app = swift3.filter_factory({})(FakeAppObject())
        req = Request.blank('/bucket/object',
                            environ={'REQUEST_METHOD': 'GET'},
                            headers={'Authorization': 'AWS test:tester:hmac',
                                     'Range': 'bytes=0-3'})
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], '206')

        headers = dict(local_app.app.response_args[1])
        self.assertTrue('Content-Range' in  headers)
        self.assertTrue(headers['Content-Range'].startswith('bytes 0-3'))
开发者ID:Cygnet,项目名称:swift,代码行数:12,代码来源:test_swift3.py


示例14: test_object_GET_Range

    def test_object_GET_Range(self):
        local_app = swift3.filter_factory({})(FakeAppObject())
        req = Request.blank(
            "/bucket/object",
            environ={"REQUEST_METHOD": "GET"},
            headers={"Authorization": "AWS test:tester:hmac", "Range": "bytes=0-3"},
        )
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], "206")

        headers = dict(local_app.app.response_args[1])
        self.assertTrue("Content-Range" in headers)
        self.assertTrue(headers["Content-Range"].startswith("bytes 0-3"))
开发者ID:double-z,项目名称:swift,代码行数:13,代码来源:test_swift3.py


示例15: test_signed_urls

 def test_signed_urls(self):
     class FakeApp(object):
         def __call__(self, env, start_response):
             self.req = Request(env)
             start_response('200 OK')
             start_response([])
     app = FakeApp()
     local_app = swift3.filter_factory({})(app)
     req = Request.blank('/bucket/object?Signature=X&Expires=Y&'
             'AWSAccessKeyId=Z', environ={'REQUEST_METHOD': 'GET'})
     req.date = datetime.now()
     req.content_type = 'text/plain'
     resp = local_app(req.environ, lambda *args: None)
     self.assertEquals(app.req.headers['Authorization'], 'AWS Z:X')
     self.assertEquals(app.req.headers['Date'], 'Y')
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:15,代码来源:test_swift3.py


示例16: test_object_PUT

    def test_object_PUT(self):
        local_app = swift3.filter_factory({})(FakeAppObject(201))
        req = Request.blank('/bucket/object',
                            environ={'REQUEST_METHOD': 'PUT'},
                            headers={'Authorization': 'AWS test:tester:hmac',
                                     'x-amz-storage-class': 'REDUCED_REDUNDANCY',
                                     'Content-MD5': 'Gyz1NfJ3Mcl0NDZFo5hTKA=='})
        req.date = datetime.now()
        req.content_type = 'text/plain'
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], '200')

        headers = dict(local_app.app.response_args[1])
        self.assertEquals(headers['ETag'],
                          "\"%s\"" % local_app.app.response_headers['etag'])
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:15,代码来源:test_swift3.py


示例17: test_signed_urls

    def test_signed_urls(self):
        class FakeApp(object):
            def __call__(self, env, start_response):
                self.req = Request(env)
                start_response("200 OK")
                start_response([])

        app = FakeApp()
        local_app = swift3.filter_factory({})(app)
        req = Request.blank(
            "/bucket/object?Signature=X&Expires=Y&" "AWSAccessKeyId=Z", environ={"REQUEST_METHOD": "GET"}
        )
        req.date = datetime.now()
        req.content_type = "text/plain"
        resp = local_app(req.environ, lambda *args: None)
        self.assertEquals(app.req.headers["Authorization"], "AWS Z:X")
        self.assertEquals(app.req.headers["Date"], "Y")
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:17,代码来源:test_swift3.py


示例18: test_object_PUT

    def test_object_PUT(self):
        local_app = swift3.filter_factory({})(FakeAppObject(201))
        req = Request.blank(
            "/bucket/object",
            environ={"REQUEST_METHOD": "PUT"},
            headers={
                "Authorization": "AWS test:tester:hmac",
                "x-amz-storage-class": "REDUCED_REDUNDANCY",
                "Content-MD5": "Gyz1NfJ3Mcl0NDZFo5hTKA==",
            },
        )
        req.date = datetime.now()
        req.content_type = "text/plain"
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], "200")

        headers = dict(local_app.app.response_args[1])
        self.assertEquals(headers["ETag"], '"%s"' % local_app.app.response_headers["etag"])
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:18,代码来源:test_swift3.py


示例19: _test_object_GETorHEAD

    def _test_object_GETorHEAD(self, method):
        local_app = swift3.filter_factory({})(FakeAppObject())
        req = Request.blank(
            "/bucket/object", environ={"REQUEST_METHOD": method}, headers={"Authorization": "AWS test:tester:hmac"}
        )
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], "200")

        headers = dict(local_app.app.response_args[1])
        for key, val in local_app.app.response_headers.iteritems():
            if key in ("Content-Length", "Content-Type", "Content-Encoding", "etag", "last-modified"):
                self.assertTrue(key in headers)
                self.assertEquals(headers[key], val)

            elif key.startswith("x-object-meta-"):
                self.assertTrue("x-amz-meta-" + key[14:] in headers)
                self.assertEquals(headers["x-amz-meta-" + key[14:]], val)

        if method == "GET":
            self.assertEquals(resp, local_app.app.object_body)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:20,代码来源:test_swift3.py


示例20: test_service_GET

    def test_service_GET(self):
        local_app = swift3.filter_factory({})(FakeAppService())
        req = Request.blank("/", environ={"REQUEST_METHOD": "GET"}, headers={"Authorization": "AWS test:tester:hmac"})
        resp = local_app(req.environ, local_app.app.do_start_response)
        self.assertEquals(local_app.app.response_args[0].split()[0], "200")

        dom = xml.dom.minidom.parseString("".join(resp))
        self.assertEquals(dom.firstChild.nodeName, "ListAllMyBucketsResult")

        buckets = [n for n in dom.getElementsByTagName("Bucket")]
        listing = [n for n in buckets[0].childNodes if n.nodeName != "#text"]
        self.assertEquals(len(listing), 2)

        names = []
        for b in buckets:
            if b.childNodes[0].nodeName == "Name":
                names.append(b.childNodes[0].childNodes[0].nodeValue)

        self.assertEquals(len(names), len(FakeAppService().buckets))
        for i in FakeAppService().buckets:
            self.assertTrue(i[0] in names)
开发者ID:lixmgl,项目名称:Intern_OpenStack_Swift,代码行数:21,代码来源:test_swift3.py



注:本文中的swift.common.middleware.swift3.filter_factory函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tempauth.filter_factory函数代码示例发布时间:2022-05-27
下一篇:
Python staticweb.filter_factory函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap