• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python base.Controller类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.proxy.controllers.base.Controller的典型用法代码示例。如果您正苦于以下问题:Python Controller类的具体用法?Python Controller怎么用?Python Controller使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Controller类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_best_response_overrides

    def test_best_response_overrides(self):
        base = Controller(self.app)
        responses = [
            (302, 'Found', '', b'The resource has moved temporarily.'),
            (100, 'Continue', '', b''),
            (404, 'Not Found', '', b'Custom body'),
        ]
        server_type = "Base DELETE"
        req = Request.blank('/v1/a/c/o', method='DELETE')
        statuses, reasons, headers, bodies = zip(*responses)

        # First test that you can't make a quorum with only overridden
        # responses
        overrides = {302: 204, 100: 204}
        resp = base.best_response(req, statuses, reasons, bodies, server_type,
                                  headers=headers, overrides=overrides)
        self.assertEqual(resp.status, '503 Service Unavailable')

        # next make a 404 quorum and make sure the last delete (real) 404
        # status is the one returned.
        overrides = {100: 404}
        resp = base.best_response(req, statuses, reasons, bodies, server_type,
                                  headers=headers, overrides=overrides)
        self.assertEqual(resp.status, '404 Not Found')
        self.assertEqual(resp.body, b'Custom body')
开发者ID:mahak,项目名称:swift,代码行数:25,代码来源:test_base.py


示例2: __init__

 def __init__(self, app, account_name, **kwargs):
     Controller.__init__(self, app)
     #将ascii形式的字符转换成unicode形式字符,url解码
     self.account_name = unquote(account_name)
     if not self.app.allow_account_management:
         self.allowed_methods.remove('PUT')
         self.allowed_methods.remove('DELETE')
开发者ID:sunzz679,项目名称:swift-2.4.0--source-read,代码行数:7,代码来源:account.py


示例3: test_get_shard_ranges_for_object_put

    def test_get_shard_ranges_for_object_put(self):
        ts_iter = make_timestamp_iter()
        shard_ranges = [dict(ShardRange(
            '.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
            '%d_upper' % i, object_count=i, bytes_used=1024 * i,
            meta_timestamp=next(ts_iter)))
            for i in range(3)]
        base = Controller(self.app)
        req = Request.blank('/v1/a/c/o', method='PUT')
        resp_headers = {'X-Backend-Record-Type': 'shard'}
        with mocked_http_conn(
            200, 200,
            body_iter=iter([b'',
                            json.dumps(shard_ranges[1:2]).encode('ascii')]),
            headers=resp_headers
        ) as fake_conn:
            actual = base._get_shard_ranges(req, 'a', 'c', '1_test')

        # account info
        captured = fake_conn.requests
        self.assertEqual('HEAD', captured[0]['method'])
        self.assertEqual('a', captured[0]['path'][7:])
        # container GET
        self.assertEqual('GET', captured[1]['method'])
        self.assertEqual('a/c', captured[1]['path'][7:])
        params = sorted(captured[1]['qs'].split('&'))
        self.assertEqual(
            ['format=json', 'includes=1_test'], params)
        self.assertEqual(
            'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
        self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
        self.assertFalse(self.app.logger.get_lines_for_level('error'))
开发者ID:mahak,项目名称:swift,代码行数:32,代码来源:test_base.py


示例4: __init__

    def __init__(self, app, account_name, **kwargs):
        Controller.__init__(self, app)
	print 'in __init__ of accountcontroller class'
        self.account_name = unquote(account_name)
        if not self.app.allow_account_management:
            self.allowed_methods.remove('PUT')
            self.allowed_methods.remove('DELETE')
开发者ID:jannatunnoor,项目名称:test_swift,代码行数:7,代码来源:account.py


示例5: __init__

 def __init__(self, app, conf):
     Controller.__init__(self, app)
     self.conf = conf
     self.logger = get_logger(conf, log_route='ringinfo')
     self.swift_dir = self.conf.get('swift_dir', '/etc/swift')
     self.object_ring = Ring(self.swift_dir, ring_name='object')
     self.container_ring = Ring(self.swift_dir, ring_name='container')
     self.account_ring = Ring(self.swift_dir, ring_name='account')
开发者ID:HeyManLeader,项目名称:OpenStack-Swift,代码行数:8,代码来源:gziptojson.py


示例6: __init__

 def __init__(self, app, version, expose_info, disallowed_sections,
              admin_key):
     Controller.__init__(self, app)
     self.expose_info = expose_info
     self.disallowed_sections = disallowed_sections
     self.admin_key = admin_key
     self.allowed_hmac_methods = {
         'HEAD': ['HEAD', 'GET'],
         'GET': ['GET']}
开发者ID:sunzz679,项目名称:swift-2.4.0--source-read,代码行数:9,代码来源:info.py


示例7: test_container_info_without_req

    def test_container_info_without_req(self):
        base = Controller(self.app)
        base.account_name = 'a'
        base.container_name = 'c'

        container_info = \
            base.container_info(base.account_name,
                                base.container_name)
        self.assertEqual(container_info['status'], 0)
开发者ID:mahak,项目名称:swift,代码行数:9,代码来源:test_base.py


示例8: test_transfer_headers_with_sysmeta

 def test_transfer_headers_with_sysmeta(self):
     base = Controller(self.app)
     good_hdrs = {"x-base-sysmeta-foo": "ok", "X-Base-sysmeta-Bar": "also ok"}
     bad_hdrs = {"x-base-sysmeta-": "too short"}
     hdrs = dict(good_hdrs)
     hdrs.update(bad_hdrs)
     dst_hdrs = HeaderKeyDict()
     base.transfer_headers(hdrs, dst_hdrs)
     self.assertEqual(HeaderKeyDict(good_hdrs), dst_hdrs)
开发者ID:helen5haha,项目名称:swift,代码行数:9,代码来源:test_base.py


示例9: test_generate_request_headers

 def test_generate_request_headers(self):
     base = Controller(self.app)
     src_headers = {"x-remove-base-meta-owner": "x", "x-base-meta-size": "151M", "new-owner": "Kun"}
     req = Request.blank("/v1/a/c/o", headers=src_headers)
     dst_headers = base.generate_request_headers(req, transfer=True)
     expected_headers = {"x-base-meta-owner": "", "x-base-meta-size": "151M", "connection": "close"}
     for k, v in expected_headers.iteritems():
         self.assertTrue(k in dst_headers)
         self.assertEqual(v, dst_headers[k])
     self.assertFalse("new-owner" in dst_headers)
开发者ID:helen5haha,项目名称:swift,代码行数:10,代码来源:test_base.py


示例10: test_transfer_headers_with_sysmeta

 def test_transfer_headers_with_sysmeta(self):
     base = Controller(self.app)
     good_hdrs = {'x-base-sysmeta-foo': 'ok',
                  'X-Base-sysmeta-Bar': 'also ok'}
     bad_hdrs = {'x-base-sysmeta-': 'too short'}
     hdrs = dict(good_hdrs)
     hdrs.update(bad_hdrs)
     dst_hdrs = HeaderKeyDict()
     base.transfer_headers(hdrs, dst_hdrs)
     self.assertEqual(HeaderKeyDict(good_hdrs), dst_hdrs)
开发者ID:mahak,项目名称:swift,代码行数:10,代码来源:test_base.py


示例11: _check_get_shard_ranges_bad_data

 def _check_get_shard_ranges_bad_data(self, body):
     base = Controller(self.app)
     req = Request.blank('/v1/a/c/o', method='PUT')
     # empty response
     headers = {'X-Backend-Record-Type': 'shard'}
     with mocked_http_conn(200, 200, body_iter=iter([b'', body]),
                           headers=headers):
         actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
     self.assertIsNone(actual)
     lines = self.app.logger.get_lines_for_level('error')
     return lines
开发者ID:mahak,项目名称:swift,代码行数:11,代码来源:test_base.py


示例12: test_get_shard_ranges_request_failed

 def test_get_shard_ranges_request_failed(self):
     base = Controller(self.app)
     req = Request.blank('/v1/a/c/o', method='PUT')
     with mocked_http_conn(200, 404, 404, 404):
         actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
     self.assertIsNone(actual)
     self.assertFalse(self.app.logger.get_lines_for_level('error'))
     warning_lines = self.app.logger.get_lines_for_level('warning')
     self.assertIn('Failed to get container listing', warning_lines[0])
     self.assertIn('/a/c', warning_lines[0])
     self.assertFalse(warning_lines[1:])
开发者ID:mahak,项目名称:swift,代码行数:11,代码来源:test_base.py


示例13: test_generate_request_headers_with_no_orig_req

 def test_generate_request_headers_with_no_orig_req(self):
     base = Controller(self.app)
     src_headers = {'x-remove-base-meta-owner': 'x',
                    'x-base-meta-size': '151M',
                    'new-owner': 'Kun'}
     dst_headers = base.generate_request_headers(None,
                                                 additional=src_headers)
     expected_headers = {'x-base-meta-size': '151M',
                         'connection': 'close'}
     for k, v in expected_headers.items():
         self.assertDictContainsSubset(expected_headers, dst_headers)
     self.assertEqual('', dst_headers['Referer'])
开发者ID:bkolli,项目名称:swift,代码行数:12,代码来源:test_base.py


示例14: test_generate_request_headers_with_sysmeta

 def test_generate_request_headers_with_sysmeta(self):
     base = Controller(self.app)
     good_hdrs = {"x-base-sysmeta-foo": "ok", "X-Base-sysmeta-Bar": "also ok"}
     bad_hdrs = {"x-base-sysmeta-": "too short"}
     hdrs = dict(good_hdrs)
     hdrs.update(bad_hdrs)
     req = Request.blank("/v1/a/c/o", headers=hdrs)
     dst_headers = base.generate_request_headers(req, transfer=True)
     for k, v in good_hdrs.iteritems():
         self.assertTrue(k.lower() in dst_headers)
         self.assertEqual(v, dst_headers[k.lower()])
     for k, v in bad_hdrs.iteritems():
         self.assertFalse(k.lower() in dst_headers)
开发者ID:helen5haha,项目名称:swift,代码行数:13,代码来源:test_base.py


示例15: test_generate_request_headers

 def test_generate_request_headers(self):
     base = Controller(self.app)
     src_headers = {'x-remove-base-meta-owner': 'x',
                    'x-base-meta-size': '151M',
                    'new-owner': 'Kun'}
     req = Request.blank('/v1/a/c/o', headers=src_headers)
     dst_headers = base.generate_request_headers(req, transfer=True)
     expected_headers = {'x-base-meta-owner': '',
                         'x-base-meta-size': '151M'}
     for k, v in expected_headers.iteritems():
         self.assertTrue(k in dst_headers)
         self.assertEqual(v, dst_headers[k])
     self.assertFalse('new-owner' in dst_headers)
开发者ID:10389030,项目名称:swift,代码行数:13,代码来源:test_base.py


示例16: test_generate_request_headers_with_sysmeta

 def test_generate_request_headers_with_sysmeta(self):
     base = Controller(self.app)
     good_hdrs = {'x-base-sysmeta-foo': 'ok',
                  'X-Base-sysmeta-Bar': 'also ok'}
     bad_hdrs = {'x-base-sysmeta-': 'too short'}
     hdrs = dict(good_hdrs)
     hdrs.update(bad_hdrs)
     req = Request.blank('/v1/a/c/o', headers=hdrs)
     dst_headers = base.generate_request_headers(req, transfer=True)
     for k, v in good_hdrs.items():
         self.assertIn(k.lower(), dst_headers)
         self.assertEqual(v, dst_headers[k.lower()])
     for k, v in bad_hdrs.items():
         self.assertNotIn(k.lower(), dst_headers)
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py


示例17: test_options_unauthorized

    def test_options_unauthorized(self):
        base = Controller(self.app)
        base.account_name = 'a'
        base.container_name = 'c'
        self.app.cors_allow_origin = ['http://NOT_IT']
        req = Request.blank('/v1/a/c/o',
                            environ={'swift.cache': FakeCache()},
                            headers={'Origin': 'http://m.com',
                                     'Access-Control-Request-Method': 'GET'})

        with mock.patch('swift.proxy.controllers.base.'
                        'http_connect', fake_http_connect(200)):
            resp = base.OPTIONS(req)
        self.assertEqual(resp.status_int, 401)
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py


示例18: test_options_unauthorized

    def test_options_unauthorized(self):
        base = Controller(self.app)
        base.account_name = "a"
        base.container_name = "c"
        self.app.cors_allow_origin = ["http://NOT_IT"]
        req = Request.blank(
            "/v1/a/c/o",
            environ={"swift.cache": FakeCache()},
            headers={"Origin": "http://m.com", "Access-Control-Request-Method": "GET"},
        )

        with patch("swift.proxy.controllers.base." "http_connect", fake_http_connect(200)):
            resp = base.OPTIONS(req)
        self.assertEqual(resp.status_int, 401)
开发者ID:helen5haha,项目名称:swift,代码行数:14,代码来源:test_base.py


示例19: test_get_shard_ranges_missing_record_type

 def test_get_shard_ranges_missing_record_type(self):
     base = Controller(self.app)
     req = Request.blank('/v1/a/c/o', method='PUT')
     sr = ShardRange('a/c', Timestamp.now())
     body = json.dumps([dict(sr)]).encode('ascii')
     with mocked_http_conn(
             200, 200, body_iter=iter([b'', body])):
         actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
     self.assertIsNone(actual)
     error_lines = self.app.logger.get_lines_for_level('error')
     self.assertIn('Failed to get shard ranges', error_lines[0])
     self.assertIn('unexpected record type', error_lines[0])
     self.assertIn('/a/c', error_lines[0])
     self.assertFalse(error_lines[1:])
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py


示例20: test_base_have_quorum

 def test_base_have_quorum(self):
     base = Controller(self.app)
     # just throw a bunch of test cases at it
     self.assertEqual(base.have_quorum([201, 404], 3), False)
     self.assertEqual(base.have_quorum([201, 201], 4), True)
     self.assertEqual(base.have_quorum([201], 4), False)
     self.assertEqual(base.have_quorum([201, 201, 404, 404], 4), True)
     self.assertEqual(base.have_quorum([201, 302, 418, 503], 4), False)
     self.assertEqual(base.have_quorum([201, 503, 503, 201], 4), True)
     self.assertEqual(base.have_quorum([201, 201], 3), True)
     self.assertEqual(base.have_quorum([404, 404], 3), True)
     self.assertEqual(base.have_quorum([201, 201], 2), True)
     self.assertEqual(base.have_quorum([201, 404], 2), True)
     self.assertEqual(base.have_quorum([404, 404], 2), True)
     self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True)
开发者ID:harrisonfeng,项目名称:swift,代码行数:15,代码来源:test_base.py



注:本文中的swift.proxy.controllers.base.Controller类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python base.GetOrHeadHandler类代码示例发布时间:2022-05-27
下一篇:
Python base.headers_to_object_info函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap