• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python helpers.FakeSwift类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.unit.common.middleware.helpers.FakeSwift的典型用法代码示例。如果您正苦于以下问题:Python FakeSwift类的具体用法?Python FakeSwift怎么用?Python FakeSwift使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FakeSwift类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: do_test

        def do_test(method, plain_etags, expected_plain_etags=None):
            env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
            match_header_value = ", ".join(plain_etags)
            req = Request.blank(
                "/v1/a/c/o", environ=env, method=method, headers={match_header_name: match_header_value}
            )
            app = FakeSwift()
            app.register(method, "/v1/a/c/o", HTTPOk, {})
            resp = req.get_response(encrypter.Encrypter(app, {}))
            self.assertEqual("200 OK", resp.status)

            self.assertEqual(1, len(app.calls), app.calls)
            self.assertEqual(method, app.calls[0][0])
            actual_headers = app.headers[0]

            # verify the alternate etag location has been specified
            if match_header_value and match_header_value != "*":
                self.assertIn("X-Backend-Etag-Is-At", actual_headers)
                self.assertEqual("X-Object-Sysmeta-Crypto-Etag-Mac", actual_headers["X-Backend-Etag-Is-At"])

            # verify etags have been supplemented with masked values
            self.assertIn(match_header_name, actual_headers)
            actual_etags = set(actual_headers[match_header_name].split(", "))
            key = fetch_crypto_keys()["object"]
            masked_etags = [
                '"%s"' % base64.b64encode(hmac.new(key, etag.strip('"'), hashlib.sha256).digest())
                for etag in plain_etags
                if etag not in ("*", "")
            ]
            expected_etags = set((expected_plain_etags or plain_etags) + masked_etags)
            self.assertEqual(expected_etags, actual_etags)
            # check that the request environ was returned to original state
            self.assertEqual(set(plain_etags), set(req.headers[match_header_name].split(", ")))
开发者ID:notmyname,项目名称:swift,代码行数:33,代码来源:test_encrypter.py


示例2: setUp

 def setUp(self):
     self.app = FakeSwift()
     conf = {'symloop_max': '2'}
     self.sym = symlink.filter_factory(conf)(self.app)
     self.sym.logger = self.app.logger
     vw_conf = {'allow_versioned_writes': 'true'}
     self.vw = versioned_writes.filter_factory(vw_conf)(self.sym)
开发者ID:chenzhongtao,项目名称:swift,代码行数:7,代码来源:test_symlink.py


示例3: do_test

        def do_test(method, plain_etags, expected_plain_etags=None):
            env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
            match_header_value = ', '.join(plain_etags)
            req = Request.blank(
                '/v1/a/c/o', environ=env, method=method,
                headers={match_header_name: match_header_value})
            app = FakeSwift()
            app.register(method, '/v1/a/c/o', HTTPOk, {})
            resp = req.get_response(encrypter.Encrypter(app, {}))
            self.assertEqual('200 OK', resp.status)

            self.assertEqual(1, len(app.calls), app.calls)
            self.assertEqual(method, app.calls[0][0])
            actual_headers = app.headers[0]

            # verify the alternate etag location has been specified
            if match_header_value and match_header_value != '*':
                self.assertIn('X-Backend-Etag-Is-At', actual_headers)
                self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
                                 actual_headers['X-Backend-Etag-Is-At'])

            # verify etags have been supplemented with masked values
            self.assertIn(match_header_name, actual_headers)
            actual_etags = set(actual_headers[match_header_name].split(', '))
            # masked values for secret_id None
            key = fetch_crypto_keys()['object']
            masked_etags = [
                '"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
                    key, wsgi_to_bytes(etag.strip('"')),
                    hashlib.sha256).digest()))
                for etag in plain_etags if etag not in ('*', '')]
            # masked values for secret_id myid
            key = fetch_crypto_keys(key_id={'secret_id': 'myid'})['object']
            masked_etags_myid = [
                '"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
                    key, wsgi_to_bytes(etag.strip('"')),
                    hashlib.sha256).digest()))
                for etag in plain_etags if etag not in ('*', '')]
            expected_etags = set((expected_plain_etags or plain_etags) +
                                 masked_etags + masked_etags_myid)
            self.assertEqual(expected_etags, actual_etags)
            # check that the request environ was returned to original state
            self.assertEqual(set(plain_etags),
                             set(req.headers[match_header_name].split(', ')))
开发者ID:mahak,项目名称:swift,代码行数:44,代码来源:test_encrypter.py


示例4: setUp

 def setUp(self):
     conf = {'sds_default_account': 'OPENIO'}
     self.filter_conf = {
         'strip_v1': 'true',
         'swift3_compat': 'true',
         'account_first': 'true'
     }
     self.app = FakeSwift()
     self.ch = container_hierarchy.filter_factory(
         conf,
         **self.filter_conf)(self.app)
开发者ID:fvennetier,项目名称:oio-swift,代码行数:11,代码来源:test_container_hierarchy.py


示例5: setUp

 def setUp(self):
     self.fake_swift = FakeSwift()
     self.app = listing_formats.ListingFilter(self.fake_swift)
     self.fake_account_listing = json.dumps([
         {'name': 'bar', 'bytes': 0, 'count': 0,
          'last_modified': '1970-01-01T00:00:00.000000'},
         {'subdir': 'foo_'},
     ])
     self.fake_container_listing = json.dumps([
         {'name': 'bar', 'hash': 'etag', 'bytes': 0,
          'content_type': 'text/plain',
          'last_modified': '1970-01-01T00:00:00.000000'},
         {'subdir': 'foo/'},
     ])
开发者ID:chenzhongtao,项目名称:swift,代码行数:14,代码来源:test_listing_formats.py


示例6: setUp

    def setUp(self):
        conf = {'sds_default_account': 'OPENIO'}
        self.filter_conf = {
            'strip_v1': 'true',
            'swift3_compat': 'true',
            'account_first': 'true',
            'stop_at_first_match': 'true',
            'pattern1': r'(\d{3})/(\d{3})/(\d)\d\d/\d\d(\d)/',
            'pattern2': r'(\d{3})/(\d)\d\d/\d\d(\d)/',
            'pattern3': r'^(cloud)/([0-9a-f][0-9a-f])',
            'pattern4': r'^(cloud)/([0-9a-f])',
            'pattern9': r'^/?([^/]+)',
        }

        if hasattr(ContainerBuilder, 'alternatives'):
            self.filter_conf['stop_at_first_match'] = 'false'

        self.app = FakeSwift()
        self.ch = regexcontainer.filter_factory(
            conf,
            **self.filter_conf)(self.app)
开发者ID:fvennetier,项目名称:oio-swift,代码行数:21,代码来源:test_regexcontainer.py


示例7: setUp

 def setUp(self):
     self.app = FakeSwift()
     conf = {'allow_versioned_writes': 'true'}
     self.vw = versioned_writes.filter_factory(conf)(self.app)
开发者ID:Ahiknsr,项目名称:swift,代码行数:4,代码来源:test_versioned_writes.py


示例8: VersionedWritesTestCase

class VersionedWritesTestCase(unittest.TestCase):
    def setUp(self):
        self.app = FakeSwift()
        conf = {'allow_versioned_writes': 'true'}
        self.vw = versioned_writes.filter_factory(conf)(self.app)

    def call_app(self, req, app=None, expect_exception=False):
        if app is None:
            app = self.app

        self.authorized = []

        def authorize(req):
            self.authorized.append(req)

        if 'swift.authorize' not in req.environ:
            req.environ['swift.authorize'] = authorize

        req.headers.setdefault("User-Agent", "Marula Kruger")

        status = [None]
        headers = [None]

        def start_response(s, h, ei=None):
            status[0] = s
            headers[0] = h

        body_iter = app(req.environ, start_response)
        body = ''
        caught_exc = None
        try:
            for chunk in body_iter:
                body += chunk
        except Exception as exc:
            if expect_exception:
                caught_exc = exc
            else:
                raise

        if expect_exception:
            return status[0], headers[0], body, caught_exc
        else:
            return status[0], headers[0], body

    def call_vw(self, req, **kwargs):
        return self.call_app(req, app=self.vw, **kwargs)

    def assertRequestEqual(self, req, other):
        self.assertEqual(req.method, other.method)
        self.assertEqual(req.path, other.path)

    def test_put_container(self):
        self.app.register('PUT', '/v1/a/c', swob.HTTPOk, {}, 'passed')
        req = Request.blank('/v1/a/c',
                            headers={'X-Versions-Location': 'ver_cont'},
                            environ={'REQUEST_METHOD': 'PUT'})
        status, headers, body = self.call_vw(req)
        self.assertEqual(status, '200 OK')

        # check for sysmeta header
        calls = self.app.calls_with_headers
        method, path, req_headers = calls[0]
        self.assertEqual('PUT', method)
        self.assertEqual('/v1/a/c', path)
        self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
        self.assertEqual(len(self.authorized), 1)
        self.assertRequestEqual(req, self.authorized[0])

    def test_container_allow_versioned_writes_false(self):
        self.vw.conf = {'allow_versioned_writes': 'false'}

        # PUT/POST container must fail as 412 when allow_versioned_writes
        # set to false
        for method in ('PUT', 'POST'):
            req = Request.blank('/v1/a/c',
                                headers={'X-Versions-Location': 'ver_cont'},
                                environ={'REQUEST_METHOD': method})
            status, headers, body = self.call_vw(req)
            self.assertEqual(status, "412 Precondition Failed")

        # GET/HEAD performs as normal
        self.app.register('GET', '/v1/a/c', swob.HTTPOk, {}, 'passed')
        self.app.register('HEAD', '/v1/a/c', swob.HTTPOk, {}, 'passed')

        for method in ('GET', 'HEAD'):
            req = Request.blank('/v1/a/c',
                                headers={'X-Versions-Location': 'ver_cont'},
                                environ={'REQUEST_METHOD': method})
            status, headers, body = self.call_vw(req)
            self.assertEqual(status, '200 OK')

    def test_remove_versions_location(self):
        self.app.register('POST', '/v1/a/c', swob.HTTPOk, {}, 'passed')
        req = Request.blank('/v1/a/c',
                            headers={'X-Remove-Versions-Location': 'x'},
                            environ={'REQUEST_METHOD': 'POST'})
        status, headers, body = self.call_vw(req)
        self.assertEqual(status, '200 OK')

        # check for sysmeta header
#.........这里部分代码省略.........
开发者ID:Ahiknsr,项目名称:swift,代码行数:101,代码来源:test_versioned_writes.py


示例9: TestDecrypterObjectRequests

class TestDecrypterObjectRequests(unittest.TestCase):
    def setUp(self):
        self.app = FakeSwift()
        self.decrypter = decrypter.Decrypter(self.app, {})
        self.decrypter.logger = FakeLogger()

    def _make_response_headers(self, content_length, plaintext_etag, keys,
                               body_key):
        # helper method to make a typical set of response headers for a GET or
        # HEAD request
        cont_key = keys['container']
        object_key = keys['object']
        body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
                         'iv': FAKE_IV}
        body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
        return HeaderKeyDict({
            'Etag': 'hashOfCiphertext',
            'content-type': 'text/plain',
            'content-length': content_length,
            'X-Object-Sysmeta-Crypto-Etag': '%s; swift_meta=%s' % (
                base64.b64encode(encrypt(plaintext_etag, object_key, FAKE_IV)),
                get_crypto_meta_header()),
            'X-Object-Sysmeta-Crypto-Body-Meta':
                get_crypto_meta_header(body_crypto_meta),
            'x-object-transient-sysmeta-crypto-meta-test':
                base64.b64encode(encrypt('encrypt me', object_key, FAKE_IV)) +
                ';swift_meta=' + get_crypto_meta_header(),
            'x-object-sysmeta-container-update-override-etag':
                encrypt_and_append_meta('encrypt me, too', cont_key),
            'x-object-sysmeta-test': 'do not encrypt me',
        })

    def _test_request_success(self, method, body):
        env = {'REQUEST_METHOD': method,
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        req = Request.blank('/v1/a/c/o', environ=env)
        plaintext_etag = md5hex(body)
        body_key = os.urandom(32)
        enc_body = encrypt(body, body_key, FAKE_IV)
        hdrs = self._make_response_headers(
            len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)

        # there shouldn't be any x-object-meta- headers, but if there are
        # then the decrypted header will win where there is a name clash...
        hdrs.update({
            'x-object-meta-test': 'unexpected, overwritten by decrypted value',
            'x-object-meta-distinct': 'unexpected but distinct from encrypted'
        })
        self.app.register(
            method, '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
        resp = req.get_response(self.decrypter)
        self.assertEqual('200 OK', resp.status)
        self.assertEqual(plaintext_etag, resp.headers['Etag'])
        self.assertEqual('text/plain', resp.headers['Content-Type'])
        self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
        self.assertEqual('unexpected but distinct from encrypted',
                         resp.headers['x-object-meta-distinct'])
        self.assertEqual('do not encrypt me',
                         resp.headers['x-object-sysmeta-test'])
        self.assertEqual(
            'encrypt me, too',
            resp.headers['X-Object-Sysmeta-Container-Update-Override-Etag'])
        self.assertNotIn('X-Object-Sysmeta-Crypto-Body-Meta', resp.headers)
        self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', resp.headers)
        return resp

    def test_GET_success(self):
        body = 'FAKE APP'
        resp = self._test_request_success('GET', body)
        self.assertEqual(body, resp.body)

    def test_HEAD_success(self):
        body = 'FAKE APP'
        resp = self._test_request_success('HEAD', body)
        self.assertEqual('', resp.body)

    def test_headers_case(self):
        body = 'fAkE ApP'
        req = Request.blank('/v1/a/c/o', body='FaKe')
        req.environ[CRYPTO_KEY_CALLBACK] = fetch_crypto_keys
        plaintext_etag = md5hex(body)
        body_key = os.urandom(32)
        enc_body = encrypt(body, body_key, FAKE_IV)
        hdrs = self._make_response_headers(
            len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)

        hdrs.update({
            'x-Object-mEta-ignoRes-caSe': 'thIs pArt WilL bE cOol',
        })
        self.app.register(
            'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)

        status, headers, app_iter = req.call_application(self.decrypter)
        self.assertEqual(status, '200 OK')
        expected = {
            'Etag': '7f7837924188f7b511a9e3881a9f77a8',
            'X-Object-Sysmeta-Container-Update-Override-Etag':
            'encrypt me, too',
            'X-Object-Meta-Test': 'encrypt me',
            'Content-Length': '8',
#.........这里部分代码省略.........
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:101,代码来源:test_decrypter.py


示例10: TestEncrypter

class TestEncrypter(unittest.TestCase):
    def setUp(self):
        self.app = FakeSwift()
        self.encrypter = encrypter.Encrypter(self.app, {})
        self.encrypter.logger = FakeLogger()

    def _verify_user_metadata(self, req_hdrs, name, value, key):
        # verify encrypted version of user metadata
        self.assertNotIn('X-Object-Meta-' + name, req_hdrs)
        expected_hdr = 'X-Object-Transient-Sysmeta-Crypto-Meta-' + name
        self.assertIn(expected_hdr, req_hdrs)
        enc_val, param = req_hdrs[expected_hdr].split(';')
        param = param.strip()
        self.assertTrue(param.startswith('swift_meta='))
        actual_meta = json.loads(
            urlparse.unquote_plus(param[len('swift_meta='):]))
        self.assertEqual(Crypto.cipher, actual_meta['cipher'])
        meta_iv = base64.b64decode(actual_meta['iv'])
        self.assertEqual(FAKE_IV, meta_iv)
        self.assertEqual(
            base64.b64encode(encrypt(value, key, meta_iv)),
            enc_val)
        # if there is any encrypted user metadata then this header should exist
        self.assertIn('X-Object-Transient-Sysmeta-Crypto-Meta', req_hdrs)
        common_meta = json.loads(urlparse.unquote_plus(
            req_hdrs['X-Object-Transient-Sysmeta-Crypto-Meta']))
        self.assertDictEqual({'cipher': Crypto.cipher,
                              'key_id': {'v': 'fake', 'path': '/a/c/fake'}},
                             common_meta)

    def test_PUT_req(self):
        body_key = os.urandom(32)
        object_key = fetch_crypto_keys()['object']
        plaintext = 'FAKE APP'
        plaintext_etag = md5hex(plaintext)
        ciphertext = encrypt(plaintext, body_key, FAKE_IV)
        ciphertext_etag = md5hex(ciphertext)

        env = {'REQUEST_METHOD': 'PUT',
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        hdrs = {'etag': plaintext_etag,
                'content-type': 'text/plain',
                'content-length': str(len(plaintext)),
                'x-object-meta-etag': 'not to be confused with the Etag!',
                'x-object-meta-test': 'encrypt me',
                'x-object-sysmeta-test': 'do not encrypt me'}
        req = Request.blank(
            '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
        self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
        with mock.patch(
            'swift.common.middleware.crypto.crypto_utils.'
            'Crypto.create_random_key',
                return_value=body_key):
            resp = req.get_response(self.encrypter)
        self.assertEqual('201 Created', resp.status)
        self.assertEqual(plaintext_etag, resp.headers['Etag'])

        # verify metadata items
        self.assertEqual(1, len(self.app.calls), self.app.calls)
        self.assertEqual('PUT', self.app.calls[0][0])
        req_hdrs = self.app.headers[0]

        # verify body crypto meta
        actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta']
        actual = json.loads(urlparse.unquote_plus(actual))
        self.assertEqual(Crypto().cipher, actual['cipher'])
        self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))

        # verify wrapped body key
        expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
        self.assertEqual(expected_wrapped_key,
                         base64.b64decode(actual['body_key']['key']))
        self.assertEqual(FAKE_IV,
                         base64.b64decode(actual['body_key']['iv']))
        self.assertEqual(fetch_crypto_keys()['id'], actual['key_id'])

        # verify etag
        self.assertEqual(ciphertext_etag, req_hdrs['Etag'])

        encrypted_etag, _junk, etag_meta = \
            req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=')
        # verify crypto_meta was appended to this etag
        self.assertTrue(etag_meta)
        actual_meta = json.loads(urlparse.unquote_plus(etag_meta))
        self.assertEqual(Crypto().cipher, actual_meta['cipher'])

        # verify encrypted version of plaintext etag
        actual = base64.b64decode(encrypted_etag)
        etag_iv = base64.b64decode(actual_meta['iv'])
        enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
        self.assertEqual(enc_etag, actual)

        # verify etag MAC for conditional requests
        actual_hmac = base64.b64decode(
            req_hdrs['X-Object-Sysmeta-Crypto-Etag-Mac'])
        self.assertEqual(actual_hmac, hmac.new(
            object_key, plaintext_etag, hashlib.sha256).digest())

        # verify encrypted etag for container update
        self.assertIn(
#.........这里部分代码省略.........
开发者ID:jgmerritt,项目名称:swift,代码行数:101,代码来源:test_encrypter.py


示例11: TestListingFormats

class TestListingFormats(unittest.TestCase):
    def setUp(self):
        self.fake_swift = FakeSwift()
        self.app = listing_formats.ListingFilter(self.fake_swift)
        self.fake_account_listing = json.dumps([
            {'name': 'bar', 'bytes': 0, 'count': 0,
             'last_modified': '1970-01-01T00:00:00.000000'},
            {'subdir': 'foo_'},
        ])
        self.fake_container_listing = json.dumps([
            {'name': 'bar', 'hash': 'etag', 'bytes': 0,
             'content_type': 'text/plain',
             'last_modified': '1970-01-01T00:00:00.000000'},
            {'subdir': 'foo/'},
        ])

    def test_valid_account(self):
        self.fake_swift.register('GET', '/v1/a', HTTPOk, {
            'Content-Length': str(len(self.fake_account_listing)),
            'Content-Type': 'application/json'}, self.fake_account_listing)

        req = Request.blank('/v1/a')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, 'bar\nfoo_\n')
        self.assertEqual(resp.headers['Content-Type'],
                         'text/plain; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a?format=json'))

        req = Request.blank('/v1/a?format=txt')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, 'bar\nfoo_\n')
        self.assertEqual(resp.headers['Content-Type'],
                         'text/plain; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a?format=json'))

        req = Request.blank('/v1/a?format=json')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, self.fake_account_listing)
        self.assertEqual(resp.headers['Content-Type'],
                         'application/json; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a?format=json'))

        req = Request.blank('/v1/a?format=xml')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body.split('\n'), [
            '<?xml version="1.0" encoding="UTF-8"?>',
            '<account name="a">',
            '<container><name>bar</name><count>0</count><bytes>0</bytes>'
            '<last_modified>1970-01-01T00:00:00.000000</last_modified>'
            '</container>',
            '<subdir name="foo_" />',
            '</account>',
        ])
        self.assertEqual(resp.headers['Content-Type'],
                         'application/xml; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a?format=json'))

    def test_valid_container(self):
        self.fake_swift.register('GET', '/v1/a/c', HTTPOk, {
            'Content-Length': str(len(self.fake_container_listing)),
            'Content-Type': 'application/json'}, self.fake_container_listing)

        req = Request.blank('/v1/a/c')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, 'bar\nfoo/\n')
        self.assertEqual(resp.headers['Content-Type'],
                         'text/plain; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a/c?format=json'))

        req = Request.blank('/v1/a/c?format=txt')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, 'bar\nfoo/\n')
        self.assertEqual(resp.headers['Content-Type'],
                         'text/plain; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a/c?format=json'))

        req = Request.blank('/v1/a/c?format=json')
        resp = req.get_response(self.app)
        self.assertEqual(resp.body, self.fake_container_listing)
        self.assertEqual(resp.headers['Content-Type'],
                         'application/json; charset=utf-8')
        self.assertEqual(self.fake_swift.calls[-1], (
            'GET', '/v1/a/c?format=json'))

        req = Request.blank('/v1/a/c?format=xml')
        resp = req.get_response(self.app)
        self.assertEqual(
            resp.body,
            '<?xml version="1.0" encoding="UTF-8"?>\n'
            '<container name="c">'
            '<object><name>bar</name><hash>etag</hash><bytes>0</bytes>'
            '<content_type>text/plain</content_type>'
            '<last_modified>1970-01-01T00:00:00.000000</last_modified>'
            '</object>'
#.........这里部分代码省略.........
开发者ID:chenzhongtao,项目名称:swift,代码行数:101,代码来源:test_listing_formats.py


示例12: TestEncrypter

class TestEncrypter(unittest.TestCase):
    def setUp(self):
        self.app = FakeSwift()
        self.encrypter = encrypter.Encrypter(self.app, {})
        self.encrypter.logger = FakeLogger()

    def _verify_user_metadata(self, req_hdrs, name, value, key):
        # verify encrypted version of user metadata
        self.assertNotIn("X-Object-Meta-" + name, req_hdrs)
        expected_hdr = "X-Object-Transient-Sysmeta-Crypto-Meta-" + name
        self.assertIn(expected_hdr, req_hdrs)
        enc_val, param = req_hdrs[expected_hdr].split(";")
        param = param.strip()
        self.assertTrue(param.startswith("swift_meta="))
        actual_meta = json.loads(urllib.unquote_plus(param[len("swift_meta=") :]))
        self.assertEqual(Crypto.cipher, actual_meta["cipher"])
        meta_iv = base64.b64decode(actual_meta["iv"])
        self.assertEqual(FAKE_IV, meta_iv)
        self.assertEqual(base64.b64encode(encrypt(value, key, meta_iv)), enc_val)
        # if there is any encrypted user metadata then this header should exist
        self.assertIn("X-Object-Transient-Sysmeta-Crypto-Meta", req_hdrs)
        common_meta = json.loads(urllib.unquote_plus(req_hdrs["X-Object-Transient-Sysmeta-Crypto-Meta"]))
        self.assertDictEqual({"cipher": Crypto.cipher, "key_id": {"v": "fake", "path": "/a/c/fake"}}, common_meta)

    def test_PUT_req(self):
        body_key = os.urandom(32)
        object_key = fetch_crypto_keys()["object"]
        plaintext = "FAKE APP"
        plaintext_etag = md5hex(plaintext)
        ciphertext = encrypt(plaintext, body_key, FAKE_IV)
        ciphertext_etag = md5hex(ciphertext)

        env = {"REQUEST_METHOD": "PUT", CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        hdrs = {
            "etag": plaintext_etag,
            "content-type": "text/plain",
            "content-length": str(len(plaintext)),
            "x-object-meta-etag": "not to be confused with the Etag!",
            "x-object-meta-test": "encrypt me",
            "x-object-sysmeta-test": "do not encrypt me",
        }
        req = Request.blank("/v1/a/c/o", environ=env, body=plaintext, headers=hdrs)
        self.app.register("PUT", "/v1/a/c/o", HTTPCreated, {})
        with mock.patch(
            "swift.common.middleware.crypto.crypto_utils." "Crypto.create_random_key", return_value=body_key
        ):
            resp = req.get_response(self.encrypter)
        self.assertEqual("201 Created", resp.status)
        self.assertEqual(plaintext_etag, resp.headers["Etag"])

        # verify metadata items
        self.assertEqual(1, len(self.app.calls), self.app.calls)
        self.assertEqual("PUT", self.app.calls[0][0])
        req_hdrs = self.app.headers[0]

        # verify body crypto meta
        actual = req_hdrs["X-Object-Sysmeta-Crypto-Body-Meta"]
        actual = json.loads(urllib.unquote_plus(actual))
        self.assertEqual(Crypto().cipher, actual["cipher"])
        self.assertEqual(FAKE_IV, base64.b64decode(actual["iv"]))

        # verify wrapped body key
        expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
        self.assertEqual(expected_wrapped_key, base64.b64decode(actual["body_key"]["key"]))
        self.assertEqual(FAKE_IV, base64.b64decode(actual["body_key"]["iv"]))
        self.assertEqual(fetch_crypto_keys()["id"], actual["key_id"])

        # verify etag
        self.assertEqual(ciphertext_etag, req_hdrs["Etag"])

        encrypted_etag, _junk, etag_meta = req_hdrs["X-Object-Sysmeta-Crypto-Etag"].partition("; swift_meta=")
        # verify crypto_meta was appended to this etag
        self.assertTrue(etag_meta)
        actual_meta = json.loads(urllib.unquote_plus(etag_meta))
        self.assertEqual(Crypto().cipher, actual_meta["cipher"])

        # verify encrypted version of plaintext etag
        actual = base64.b64decode(encrypted_etag)
        etag_iv = base64.b64decode(actual_meta["iv"])
        enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
        self.assertEqual(enc_etag, actual)

        # verify etag MAC for conditional requests
        actual_hmac = base64.b64decode(req_hdrs["X-Object-Sysmeta-Crypto-Etag-Mac"])
        self.assertEqual(actual_hmac, hmac.new(object_key, plaintext_etag, hashlib.sha256).digest())

        # verify encrypted etag for container update
        self.assertIn("X-Object-Sysmeta-Container-Update-Override-Etag", req_hdrs)
        parts = req_hdrs["X-Object-Sysmeta-Container-Update-Override-Etag"].rsplit(";", 1)
        self.assertEqual(2, len(parts))

        # extract crypto_meta from end of etag for container update
        param = parts[1].strip()
        crypto_meta_tag = "swift_meta="
        self.assertTrue(param.startswith(crypto_meta_tag), param)
        actual_meta = json.loads(urllib.unquote_plus(param[len(crypto_meta_tag) :]))
        self.assertEqual(Crypto().cipher, actual_meta["cipher"])
        self.assertEqual(fetch_crypto_keys()["id"], actual_meta["key_id"])

        cont_key = fetch_crypto_keys()["container"]
#.........这里部分代码省略.........
开发者ID:notmyname,项目名称:swift,代码行数:101,代码来源:test_encrypter.py


示例13: TestOioServerSideCopyMiddleware

class TestOioServerSideCopyMiddleware(TestServerSideCopyMiddleware):

    def setUp(self):
        self.app = FakeSwift()
        self.ssc = copy.filter_factory({
            'object_post_as_copy': 'yes',
        })(self.app)
        self.ssc.logger = self.app.logger

    def tearDown(self):
        # get_object_info() does not close response iterator,
        # thus we have to disable the unclosed_requests test.
        pass

    def test_basic_put_with_x_copy_from(self):
        self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk, {})
        self.app.register('PUT', '/v1/a/c/o2', swob.HTTPCreated, {})
        req = Request.blank('/v1/a/c/o2', environ={'REQUEST_METHOD': 'PUT'},
                            headers={'Content-Length': '0',
                                     'X-Copy-From': 'c/o'})
        status, headers, body = self.call_ssc(req)
        self.assertEqual(status, '201 Created')
        self.assertTrue(('X-Copied-From', 'c/o') in headers)
        self.assertEqual(len(self.authorized), 1)
        self.assertEqual('PUT', self.authorized[0].method)
        self.assertEqual('/v1/a/c/o2', self.authorized[0].path)
        self.assertEqual(self.app.swift_sources[0], 'SSC')
        # For basic test cases, assert orig_req_method behavior
        self.assertNotIn('swift.orig_req_method', req.environ)

    def test_static_large_object_manifest(self):
        self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk,
                          {'X-Static-Large-Object': 'True',
                           'Etag': 'should not be sent'})
        self.app.register('GET', '/v1/a/c/o', swob.HTTPOk,
                          {'X-Static-Large-Object': 'True',
                           'Etag': 'should not be sent'}, 'passed')
        self.app.register('PUT', '/v1/a/c/o2?multipart-manifest=put',
                          swob.HTTPCreated, {})
        req = Request.blank('/v1/a/c/o2?multipart-manifest=get',
                            environ={'REQUEST_METHOD': 'PUT'},
                            headers={'Content-Length': '0',
                                     'X-Copy-From': 'c/o'})
        status, headers, body = self.call_ssc(req)
        self.assertEqual(status, '201 Created')
        self.assertTrue(('X-Copied-From', 'c/o') in headers)
        self.assertEqual(3, len(self.app.calls))
        self.assertEqual('HEAD', self.app.calls[0][0])
        self.assertEqual('GET', self.app.calls[1][0])
        get_path, qs = self.app.calls[1][1].split('?')
        params = urllib.parse.parse_qs(qs)
        self.assertDictEqual(
            {'format': ['raw'], 'multipart-manifest': ['get']}, params)
        self.assertEqual(get_path, '/v1/a/c/o')
        self.assertEqual(self.app.calls[2],
                         ('PUT', '/v1/a/c/o2?multipart-manifest=put'))
        req_headers = self.app.headers[2]
        self.assertNotIn('X-Static-Large-Object', req_headers)
        self.assertNotIn('Etag', req_headers)
        self.assertEqual(len(self.authorized), 2)
        self.assertEqual('GET', self.authorized[0].method)
        self.assertEqual('/v1/a/c/o', self.authorized[0].path)
        self.assertEqual('PUT', self.authorized[1].method)
        self.assertEqual('/v1/a/c/o2', self.authorized[1].path)

    def test_static_large_object(self):
        # Compared to the original copy middleware, we do an extra HEAD request
        self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk,
                          {'X-Static-Large-Object': 'True',
                           'Etag': 'should not be sent'}, 'passed')
        self.app.register('GET', '/v1/a/c/o', swob.HTTPOk,
                          {'X-Static-Large-Object': 'True',
                           'Etag': 'should not be sent'}, 'passed')
        self.app.register('PUT', '/v1/a/c/o2',
                          swob.HTTPCreated, {})
        req = Request.blank('/v1/a/c/o2',
                            environ={'REQUEST_METHOD': 'PUT'},
                            headers={'Content-Length': '0',
                                     'X-Copy-From': 'c/o'})
        status, headers, body = self.call_ssc(req)
        self.assertEqual(status, '201 Created')
        self.assertTrue(('X-Copied-From', 'c/o') in headers)
        self.assertEqual(self.app.calls, [
            ('HEAD', '/v1/a/c/o'),
            ('GET', '/v1/a/c/o'),
            ('PUT', '/v1/a/c/o2')])
        req_headers = self.app.headers[1]
        self.assertNotIn('X-Static-Large-Object', req_headers)
        self.assertNotIn('Etag', req_headers)
        self.assertEqual(len(self.authorized), 2)
        self.assertEqual('GET', self.authorized[0].method)
        self.assertEqual('/v1/a/c/o', self.authorized[0].path)
        self.assertEqual('PUT', self.authorized[1].method)
        self.assertEqual('/v1/a/c/o2', self.authorized[1].path)

    def test_basic_put_with_x_copy_from_across_container(self):
        self.app.register('HEAD', '/v1/a/c1/o1', swob.HTTPOk, {})
        self.app.register('PUT', '/v1/a/c2/o2', swob.HTTPCreated, {})
        req = Request.blank('/v1/a/c2/o2', environ={'REQUEST_METHOD': 'PUT'},
                            headers={'Content-Length': '0',
#.........这里部分代码省略.........
开发者ID:fvennetier,项目名称:oio-swift,代码行数:101,代码来源:test_copy.py


示例14: TestUntarMetadata

class TestUntarMetadata(unittest.TestCase):
    def setUp(self):
        self.app = FakeSwift()
        self.bulk = bulk.filter_factory({})(self.app)
        self.testdir = mkdtemp(suffix='tmp_test_bulk')

    def tearDown(self):
        rmtree(self.testdir, ignore_errors=1)

    def test_extract_metadata(self):
        self.app.register('HEAD', '/v1/a/c?extract-archive=tar',
                          HTTPNoContent, {}, None)
        self.app.register('PUT', '/v1/a/c/obj1?extract-archive=tar',
                          HTTPCreated, {}, None)
        self.app.register('PUT', '/v1/a/c/obj2?extract-archive=tar',
                          HTTPCreated, {}, None)

        # It's a real pain to instantiate TarInfo objects directly; they
        # really want to come from a file on disk or a tarball. So, we write
        # out some files and add pax headers to them as they get placed into
        # the tarball.
        with open(os.path.join(self.testdir, "obj1"), "w") as fh1:
            fh1.write("obj1 contents\n")
        with open(os.path.join(self.testdir, "obj2"), "w") as fh2:
            fh2.write("obj2 contents\n")

        tar_ball = StringIO()
        tar_file = tarfile.TarFile.open(fileobj=tar_ball, mode="w",
                                        format=tarfile.PAX_FORMAT)

        # With GNU tar 1.27.1 or later (possibly 1.27 as well), a file with
        # extended attribute user.thingy = dingy gets put into the tarfile
        # with pax_headers containing key/value pair
        # (SCHILY.xattr.user.thingy, dingy), both unicode strings (py2: type
        # unicode, not type str).
        #
        # With BSD tar (libarchive), you get key/value pair
        # (LIBARCHIVE.xattr.user.thingy, dingy), which strikes me as
        # gratuitous incompatibility.
        #
        # Still, we'll support uploads with both. Just heap more code on the
        # problem until you can forget it's under there.
        with open(os.path.join(self.testdir, "obj1")) as fh1:
            tar_info1 = tar_file.gettarinfo(fileobj=fh1,
                                            arcname="obj1")
            tar_info1.pax_headers[u'SCHILY.xattr.user.mime_type'] = \
                u'application/food-diary'
            tar_info1.pax_headers[u'SCHILY.xattr.user.meta.lunch'] = \
                u'sopa de albóndigas'
            tar_info1.pax_headers[
                u'SCHILY.xattr.user.meta.afternoon-snack'] = \
                u'gigantic bucket of coffee'
            tar_file.addfile(tar_info1, fh1)

        with open(os.path.join(self.testdir, "obj2")) as fh2:
            tar_info2 = tar_file.gettarinfo(fileobj=fh2,
                                            arcname="obj2")
            tar_info2.pax_headers[
                u'LIBARCHIVE.xattr.user.meta.muppet'] = u'bert'
            tar_info2.pax_headers[
                u'LIBARCHIVE.xattr.user.meta.cat'] = u'fluffy'
            tar_info2.pax_headers[
                u'LIBARCHIVE.xattr.user.notmeta'] = u'skipped'
            tar_file.addfile(tar_info2, fh2)

        tar_ball.seek(0)

        req = Request.blank('/v1/a/c?extract-archive=tar')
        req.environ['REQUEST_METHOD'] = 'PUT'
        req.environ['wsgi.input'] = tar_ball
        req.headers['transfer-encoding'] = 'chunked'
        req.headers['accept'] = 'application/json;q=1.0'

        resp = req.get_response(self.bulk)
        self.assertEqual(resp.status_int, 200)

        # sanity check to make sure the upload worked
        upload_status = utils.json.loads(resp.body)
        self.assertEqual(upload_status['Number Files Created'], 2)

        put1_headers = HeaderKeyDict(self.app.calls_with_headers[1][2])
        self.assertEqual(
            put1_headers.get('Content-Type'),
            'application/food-diary')
        self.assertEqual(
            put1_headers.get('X-Object-Meta-Lunch'),
            'sopa de alb\xc3\xb3ndigas')
        self.assertEqual(
            put1_headers.get('X-Object-Meta-Afternoon-Snack'),
            'gigantic bucket of coffee')

        put2_headers = HeaderKeyDict(self.app.calls_with_headers[2][2])
        self.assertEqual(put2_headers.get('X-Object-Meta-Muppet'), 'bert')
        self.assertEqual(put2_headers.get('X-Object-Meta-Cat'), 'fluffy')
        self.assertEqual(put2_headers.get('Content-Type'), None)
        self.assertEqual(put2_headers.get('X-Object-Meta-Blah'), None)
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:96,代码来源:test_bulk.py


示例15: setUp

 def setUp(self):
     self.app = FakeSwift()
     self.bulk = bulk.filter_factory({})(self.app)
     self.testdir = mkdtemp(suffix='tmp_test_bulk')
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:4,代码来源:test_bulk.py


示例16: ContainerQuotaCopyingTestCases

class ContainerQuotaCopyingTestCases(unittest.TestCase):

    def setUp(self):
        self.app = FakeSwift()
        self.cq_filter = container_quotas.filter_factory({})(self.app)
        self.copy_filter = copy.filter_factory({})(self.cq_filter)

    def test_exceed_bytes_quota_copy_verb(self):
        cache = FakeCache({'bytes': 0, 'meta': {'quota-bytes': '2'}})
        self.app.register('GET', '/v1/a/c2/o2', HTTPOk,
                          {'Content-Length': '10'}, 'passed')

        req = Request.blank('/v1/a/c2/o2',
                            environ={'REQUEST_METHOD': 'COPY',
                                     'swift.cache': cache},
                            headers={'Destination': '/c/o'})
        res = req.get_response(self.copy_filter)
        self.assertEqual(res.status_int, 413)
        self.assertEqual(res.body, 'Upload exceeds quota.')

    def test_not_exceed_bytes_quota_copy_verb(self):
        self.app.register('GET', '/v1/a/c2/o2', HTTPOk,
                          {'Content-Length': '10'}, 'passed')
        self.app.register(
            'PUT', '/v1/a/c/o', HTTPOk, {}, 'passed')
        cache = FakeCache({'bytes': 0, 'meta': {'quota-bytes': '100'}})
        req = Request.blank('/v1/a/c2/o2',
                            environ={'REQUEST_METHOD': 'COPY',
                                     'swift.cache': cache},
                            headers={'Destination': '/c/o'})
        res = req.get_response(self.copy_filter)
        self.assertEqual(res.status_int, 200)

    def test_exceed_counts_quota_copy_verb(self):
        self.app.register('GET', '/v1/a/c2/o2', HTTPOk, {}, 'passed')
        cache = FakeCache({'object_count': 1, 'meta': {'quota-count': '1'}})
        req = Request.blank('/v1/a/c2/o2',
                            environ={'REQUEST_METHOD': 'COPY',
                                     'swift.cache': cache},
                            headers={'Destination': '/c/o'})
        res = req.get_response(self.copy_filter)
        self.assertEqual(res.status_int, 413)
        self.assertEqual(res.body, 'Upload exceeds quota.')

    def test_exceed_counts_quota_copy_cross_account_verb(self):
        self.app.register('GET', '/v1/a/c2/o2', HTTPOk, {}, 'passed')
        a_c_cache = {'storage_policy': '0', 'meta': {'quota-count': '2'},
                     'status': 200, 'object_count': 1}
        a2_c_cache = {'storage_po 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python descriptor.get_resource函数代码示例发布时间:2022-05-27
下一篇:
Python crypto_helpers.fetch_crypto_keys函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap