本文整理汇总了Python中test.unit.common.middleware.helpers.FakeSwift类的典型用法代码示例。如果您正苦于以下问题:Python FakeSwift类的具体用法?Python FakeSwift怎么用?Python FakeSwift使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了FakeSwift类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: do_test
def do_test(method, plain_etags, expected_plain_etags=None):
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
match_header_value = ", ".join(plain_etags)
req = Request.blank(
"/v1/a/c/o", environ=env, method=method, headers={match_header_name: match_header_value}
)
app = FakeSwift()
app.register(method, "/v1/a/c/o", HTTPOk, {})
resp = req.get_response(encrypter.Encrypter(app, {}))
self.assertEqual("200 OK", resp.status)
self.assertEqual(1, len(app.calls), app.calls)
self.assertEqual(method, app.calls[0][0])
actual_headers = app.headers[0]
# verify the alternate etag location has been specified
if match_header_value and match_header_value != "*":
self.assertIn("X-Backend-Etag-Is-At", actual_headers)
self.assertEqual("X-Object-Sysmeta-Crypto-Etag-Mac", actual_headers["X-Backend-Etag-Is-At"])
# verify etags have been supplemented with masked values
self.assertIn(match_header_name, actual_headers)
actual_etags = set(actual_headers[match_header_name].split(", "))
key = fetch_crypto_keys()["object"]
masked_etags = [
'"%s"' % base64.b64encode(hmac.new(key, etag.strip('"'), hashlib.sha256).digest())
for etag in plain_etags
if etag not in ("*", "")
]
expected_etags = set((expected_plain_etags or plain_etags) + masked_etags)
self.assertEqual(expected_etags, actual_etags)
# check that the request environ was returned to original state
self.assertEqual(set(plain_etags), set(req.headers[match_header_name].split(", ")))
开发者ID:notmyname,项目名称:swift,代码行数:33,代码来源:test_encrypter.py
示例2: setUp
def setUp(self):
self.app = FakeSwift()
conf = {'symloop_max': '2'}
self.sym = symlink.filter_factory(conf)(self.app)
self.sym.logger = self.app.logger
vw_conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(vw_conf)(self.sym)
开发者ID:chenzhongtao,项目名称:swift,代码行数:7,代码来源:test_symlink.py
示例3: do_test
def do_test(method, plain_etags, expected_plain_etags=None):
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
match_header_value = ', '.join(plain_etags)
req = Request.blank(
'/v1/a/c/o', environ=env, method=method,
headers={match_header_name: match_header_value})
app = FakeSwift()
app.register(method, '/v1/a/c/o', HTTPOk, {})
resp = req.get_response(encrypter.Encrypter(app, {}))
self.assertEqual('200 OK', resp.status)
self.assertEqual(1, len(app.calls), app.calls)
self.assertEqual(method, app.calls[0][0])
actual_headers = app.headers[0]
# verify the alternate etag location has been specified
if match_header_value and match_header_value != '*':
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
actual_headers['X-Backend-Etag-Is-At'])
# verify etags have been supplemented with masked values
self.assertIn(match_header_name, actual_headers)
actual_etags = set(actual_headers[match_header_name].split(', '))
# masked values for secret_id None
key = fetch_crypto_keys()['object']
masked_etags = [
'"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
key, wsgi_to_bytes(etag.strip('"')),
hashlib.sha256).digest()))
for etag in plain_etags if etag not in ('*', '')]
# masked values for secret_id myid
key = fetch_crypto_keys(key_id={'secret_id': 'myid'})['object']
masked_etags_myid = [
'"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
key, wsgi_to_bytes(etag.strip('"')),
hashlib.sha256).digest()))
for etag in plain_etags if etag not in ('*', '')]
expected_etags = set((expected_plain_etags or plain_etags) +
masked_etags + masked_etags_myid)
self.assertEqual(expected_etags, actual_etags)
# check that the request environ was returned to original state
self.assertEqual(set(plain_etags),
set(req.headers[match_header_name].split(', ')))
开发者ID:mahak,项目名称:swift,代码行数:44,代码来源:test_encrypter.py
示例4: setUp
def setUp(self):
conf = {'sds_default_account': 'OPENIO'}
self.filter_conf = {
'strip_v1': 'true',
'swift3_compat': 'true',
'account_first': 'true'
}
self.app = FakeSwift()
self.ch = container_hierarchy.filter_factory(
conf,
**self.filter_conf)(self.app)
开发者ID:fvennetier,项目名称:oio-swift,代码行数:11,代码来源:test_container_hierarchy.py
示例5: setUp
def setUp(self):
self.fake_swift = FakeSwift()
self.app = listing_formats.ListingFilter(self.fake_swift)
self.fake_account_listing = json.dumps([
{'name': 'bar', 'bytes': 0, 'count': 0,
'last_modified': '1970-01-01T00:00:00.000000'},
{'subdir': 'foo_'},
])
self.fake_container_listing = json.dumps([
{'name': 'bar', 'hash': 'etag', 'bytes': 0,
'content_type': 'text/plain',
'last_modified': '1970-01-01T00:00:00.000000'},
{'subdir': 'foo/'},
])
开发者ID:chenzhongtao,项目名称:swift,代码行数:14,代码来源:test_listing_formats.py
示例6: setUp
def setUp(self):
conf = {'sds_default_account': 'OPENIO'}
self.filter_conf = {
'strip_v1': 'true',
'swift3_compat': 'true',
'account_first': 'true',
'stop_at_first_match': 'true',
'pattern1': r'(\d{3})/(\d{3})/(\d)\d\d/\d\d(\d)/',
'pattern2': r'(\d{3})/(\d)\d\d/\d\d(\d)/',
'pattern3': r'^(cloud)/([0-9a-f][0-9a-f])',
'pattern4': r'^(cloud)/([0-9a-f])',
'pattern9': r'^/?([^/]+)',
}
if hasattr(ContainerBuilder, 'alternatives'):
self.filter_conf['stop_at_first_match'] = 'false'
self.app = FakeSwift()
self.ch = regexcontainer.filter_factory(
conf,
**self.filter_conf)(self.app)
开发者ID:fvennetier,项目名称:oio-swift,代码行数:21,代码来源:test_regexcontainer.py
示例7: setUp
def setUp(self):
self.app = FakeSwift()
conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(conf)(self.app)
开发者ID:Ahiknsr,项目名称:swift,代码行数:4,代码来源:test_versioned_writes.py
示例8: VersionedWritesTestCase
class VersionedWritesTestCase(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
conf = {'allow_versioned_writes': 'true'}
self.vw = versioned_writes.filter_factory(conf)(self.app)
def call_app(self, req, app=None, expect_exception=False):
if app is None:
app = self.app
self.authorized = []
def authorize(req):
self.authorized.append(req)
if 'swift.authorize' not in req.environ:
req.environ['swift.authorize'] = authorize
req.headers.setdefault("User-Agent", "Marula Kruger")
status = [None]
headers = [None]
def start_response(s, h, ei=None):
status[0] = s
headers[0] = h
body_iter = app(req.environ, start_response)
body = ''
caught_exc = None
try:
for chunk in body_iter:
body += chunk
except Exception as exc:
if expect_exception:
caught_exc = exc
else:
raise
if expect_exception:
return status[0], headers[0], body, caught_exc
else:
return status[0], headers[0], body
def call_vw(self, req, **kwargs):
return self.call_app(req, app=self.vw, **kwargs)
def assertRequestEqual(self, req, other):
self.assertEqual(req.method, other.method)
self.assertEqual(req.path, other.path)
def test_put_container(self):
self.app.register('PUT', '/v1/a/c', swob.HTTPOk, {}, 'passed')
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': 'PUT'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
# check for sysmeta header
calls = self.app.calls_with_headers
method, path, req_headers = calls[0]
self.assertEqual('PUT', method)
self.assertEqual('/v1/a/c', path)
self.assertTrue('x-container-sysmeta-versions-location' in req_headers)
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
def test_container_allow_versioned_writes_false(self):
self.vw.conf = {'allow_versioned_writes': 'false'}
# PUT/POST container must fail as 412 when allow_versioned_writes
# set to false
for method in ('PUT', 'POST'):
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': method})
status, headers, body = self.call_vw(req)
self.assertEqual(status, "412 Precondition Failed")
# GET/HEAD performs as normal
self.app.register('GET', '/v1/a/c', swob.HTTPOk, {}, 'passed')
self.app.register('HEAD', '/v1/a/c', swob.HTTPOk, {}, 'passed')
for method in ('GET', 'HEAD'):
req = Request.blank('/v1/a/c',
headers={'X-Versions-Location': 'ver_cont'},
environ={'REQUEST_METHOD': method})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
def test_remove_versions_location(self):
self.app.register('POST', '/v1/a/c', swob.HTTPOk, {}, 'passed')
req = Request.blank('/v1/a/c',
headers={'X-Remove-Versions-Location': 'x'},
environ={'REQUEST_METHOD': 'POST'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
# check for sysmeta header
#.........这里部分代码省略.........
开发者ID:Ahiknsr,项目名称:swift,代码行数:101,代码来源:test_versioned_writes.py
示例9: TestDecrypterObjectRequests
class TestDecrypterObjectRequests(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.decrypter = decrypter.Decrypter(self.app, {})
self.decrypter.logger = FakeLogger()
def _make_response_headers(self, content_length, plaintext_etag, keys,
body_key):
# helper method to make a typical set of response headers for a GET or
# HEAD request
cont_key = keys['container']
object_key = keys['object']
body_key_meta = {'key': encrypt(body_key, object_key, FAKE_IV),
'iv': FAKE_IV}
body_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
return HeaderKeyDict({
'Etag': 'hashOfCiphertext',
'content-type': 'text/plain',
'content-length': content_length,
'X-Object-Sysmeta-Crypto-Etag': '%s; swift_meta=%s' % (
base64.b64encode(encrypt(plaintext_etag, object_key, FAKE_IV)),
get_crypto_meta_header()),
'X-Object-Sysmeta-Crypto-Body-Meta':
get_crypto_meta_header(body_crypto_meta),
'x-object-transient-sysmeta-crypto-meta-test':
base64.b64encode(encrypt('encrypt me', object_key, FAKE_IV)) +
';swift_meta=' + get_crypto_meta_header(),
'x-object-sysmeta-container-update-override-etag':
encrypt_and_append_meta('encrypt me, too', cont_key),
'x-object-sysmeta-test': 'do not encrypt me',
})
def _test_request_success(self, method, body):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
hdrs = self._make_response_headers(
len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)
# there shouldn't be any x-object-meta- headers, but if there are
# then the decrypted header will win where there is a name clash...
hdrs.update({
'x-object-meta-test': 'unexpected, overwritten by decrypted value',
'x-object-meta-distinct': 'unexpected but distinct from encrypted'
})
self.app.register(
method, '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
self.assertEqual('unexpected but distinct from encrypted',
resp.headers['x-object-meta-distinct'])
self.assertEqual('do not encrypt me',
resp.headers['x-object-sysmeta-test'])
self.assertEqual(
'encrypt me, too',
resp.headers['X-Object-Sysmeta-Container-Update-Override-Etag'])
self.assertNotIn('X-Object-Sysmeta-Crypto-Body-Meta', resp.headers)
self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', resp.headers)
return resp
def test_GET_success(self):
body = 'FAKE APP'
resp = self._test_request_success('GET', body)
self.assertEqual(body, resp.body)
def test_HEAD_success(self):
body = 'FAKE APP'
resp = self._test_request_success('HEAD', body)
self.assertEqual('', resp.body)
def test_headers_case(self):
body = 'fAkE ApP'
req = Request.blank('/v1/a/c/o', body='FaKe')
req.environ[CRYPTO_KEY_CALLBACK] = fetch_crypto_keys
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
hdrs = self._make_response_headers(
len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)
hdrs.update({
'x-Object-mEta-ignoRes-caSe': 'thIs pArt WilL bE cOol',
})
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
status, headers, app_iter = req.call_application(self.decrypter)
self.assertEqual(status, '200 OK')
expected = {
'Etag': '7f7837924188f7b511a9e3881a9f77a8',
'X-Object-Sysmeta-Container-Update-Override-Etag':
'encrypt me, too',
'X-Object-Meta-Test': 'encrypt me',
'Content-Length': '8',
#.........这里部分代码省略.........
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:101,代码来源:test_decrypter.py
示例10: TestEncrypter
class TestEncrypter(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.encrypter = encrypter.Encrypter(self.app, {})
self.encrypter.logger = FakeLogger()
def _verify_user_metadata(self, req_hdrs, name, value, key):
# verify encrypted version of user metadata
self.assertNotIn('X-Object-Meta-' + name, req_hdrs)
expected_hdr = 'X-Object-Transient-Sysmeta-Crypto-Meta-' + name
self.assertIn(expected_hdr, req_hdrs)
enc_val, param = req_hdrs[expected_hdr].split(';')
param = param.strip()
self.assertTrue(param.startswith('swift_meta='))
actual_meta = json.loads(
urlparse.unquote_plus(param[len('swift_meta='):]))
self.assertEqual(Crypto.cipher, actual_meta['cipher'])
meta_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, meta_iv)
self.assertEqual(
base64.b64encode(encrypt(value, key, meta_iv)),
enc_val)
# if there is any encrypted user metadata then this header should exist
self.assertIn('X-Object-Transient-Sysmeta-Crypto-Meta', req_hdrs)
common_meta = json.loads(urlparse.unquote_plus(
req_hdrs['X-Object-Transient-Sysmeta-Crypto-Meta']))
self.assertDictEqual({'cipher': Crypto.cipher,
'key_id': {'v': 'fake', 'path': '/a/c/fake'}},
common_meta)
def test_PUT_req(self):
body_key = os.urandom(32)
object_key = fetch_crypto_keys()['object']
plaintext = 'FAKE APP'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
ciphertext_etag = md5hex(ciphertext)
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {'etag': plaintext_etag,
'content-type': 'text/plain',
'content-length': str(len(plaintext)),
'x-object-meta-etag': 'not to be confused with the Etag!',
'x-object-meta-test': 'encrypt me',
'x-object-sysmeta-test': 'do not encrypt me'}
req = Request.blank(
'/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
with mock.patch(
'swift.common.middleware.crypto.crypto_utils.'
'Crypto.create_random_key',
return_value=body_key):
resp = req.get_response(self.encrypter)
self.assertEqual('201 Created', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual('PUT', self.app.calls[0][0])
req_hdrs = self.app.headers[0]
# verify body crypto meta
actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta']
actual = json.loads(urlparse.unquote_plus(actual))
self.assertEqual(Crypto().cipher, actual['cipher'])
self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))
# verify wrapped body key
expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
self.assertEqual(expected_wrapped_key,
base64.b64decode(actual['body_key']['key']))
self.assertEqual(FAKE_IV,
base64.b64decode(actual['body_key']['iv']))
self.assertEqual(fetch_crypto_keys()['id'], actual['key_id'])
# verify etag
self.assertEqual(ciphertext_etag, req_hdrs['Etag'])
encrypted_etag, _junk, etag_meta = \
req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=')
# verify crypto_meta was appended to this etag
self.assertTrue(etag_meta)
actual_meta = json.loads(urlparse.unquote_plus(etag_meta))
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
# verify encrypted version of plaintext etag
actual = base64.b64decode(encrypted_etag)
etag_iv = base64.b64decode(actual_meta['iv'])
enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
self.assertEqual(enc_etag, actual)
# verify etag MAC for conditional requests
actual_hmac = base64.b64decode(
req_hdrs['X-Object-Sysmeta-Crypto-Etag-Mac'])
self.assertEqual(actual_hmac, hmac.new(
object_key, plaintext_etag, hashlib.sha256).digest())
# verify encrypted etag for container update
self.assertIn(
#.........这里部分代码省略.........
开发者ID:jgmerritt,项目名称:swift,代码行数:101,代码来源:test_encrypter.py
示例11: TestListingFormats
class TestListingFormats(unittest.TestCase):
def setUp(self):
self.fake_swift = FakeSwift()
self.app = listing_formats.ListingFilter(self.fake_swift)
self.fake_account_listing = json.dumps([
{'name': 'bar', 'bytes': 0, 'count': 0,
'last_modified': '1970-01-01T00:00:00.000000'},
{'subdir': 'foo_'},
])
self.fake_container_listing = json.dumps([
{'name': 'bar', 'hash': 'etag', 'bytes': 0,
'content_type': 'text/plain',
'last_modified': '1970-01-01T00:00:00.000000'},
{'subdir': 'foo/'},
])
def test_valid_account(self):
self.fake_swift.register('GET', '/v1/a', HTTPOk, {
'Content-Length': str(len(self.fake_account_listing)),
'Content-Type': 'application/json'}, self.fake_account_listing)
req = Request.blank('/v1/a')
resp = req.get_response(self.app)
self.assertEqual(resp.body, 'bar\nfoo_\n')
self.assertEqual(resp.headers['Content-Type'],
'text/plain; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a?format=json'))
req = Request.blank('/v1/a?format=txt')
resp = req.get_response(self.app)
self.assertEqual(resp.body, 'bar\nfoo_\n')
self.assertEqual(resp.headers['Content-Type'],
'text/plain; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a?format=json'))
req = Request.blank('/v1/a?format=json')
resp = req.get_response(self.app)
self.assertEqual(resp.body, self.fake_account_listing)
self.assertEqual(resp.headers['Content-Type'],
'application/json; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a?format=json'))
req = Request.blank('/v1/a?format=xml')
resp = req.get_response(self.app)
self.assertEqual(resp.body.split('\n'), [
'<?xml version="1.0" encoding="UTF-8"?>',
'<account name="a">',
'<container><name>bar</name><count>0</count><bytes>0</bytes>'
'<last_modified>1970-01-01T00:00:00.000000</last_modified>'
'</container>',
'<subdir name="foo_" />',
'</account>',
])
self.assertEqual(resp.headers['Content-Type'],
'application/xml; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a?format=json'))
def test_valid_container(self):
self.fake_swift.register('GET', '/v1/a/c', HTTPOk, {
'Content-Length': str(len(self.fake_container_listing)),
'Content-Type': 'application/json'}, self.fake_container_listing)
req = Request.blank('/v1/a/c')
resp = req.get_response(self.app)
self.assertEqual(resp.body, 'bar\nfoo/\n')
self.assertEqual(resp.headers['Content-Type'],
'text/plain; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a/c?format=json'))
req = Request.blank('/v1/a/c?format=txt')
resp = req.get_response(self.app)
self.assertEqual(resp.body, 'bar\nfoo/\n')
self.assertEqual(resp.headers['Content-Type'],
'text/plain; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a/c?format=json'))
req = Request.blank('/v1/a/c?format=json')
resp = req.get_response(self.app)
self.assertEqual(resp.body, self.fake_container_listing)
self.assertEqual(resp.headers['Content-Type'],
'application/json; charset=utf-8')
self.assertEqual(self.fake_swift.calls[-1], (
'GET', '/v1/a/c?format=json'))
req = Request.blank('/v1/a/c?format=xml')
resp = req.get_response(self.app)
self.assertEqual(
resp.body,
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<container name="c">'
'<object><name>bar</name><hash>etag</hash><bytes>0</bytes>'
'<content_type>text/plain</content_type>'
'<last_modified>1970-01-01T00:00:00.000000</last_modified>'
'</object>'
#.........这里部分代码省略.........
开发者ID:chenzhongtao,项目名称:swift,代码行数:101,代码来源:test_listing_formats.py
示例12: TestEncrypter
class TestEncrypter(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.encrypter = encrypter.Encrypter(self.app, {})
self.encrypter.logger = FakeLogger()
def _verify_user_metadata(self, req_hdrs, name, value, key):
# verify encrypted version of user metadata
self.assertNotIn("X-Object-Meta-" + name, req_hdrs)
expected_hdr = "X-Object-Transient-Sysmeta-Crypto-Meta-" + name
self.assertIn(expected_hdr, req_hdrs)
enc_val, param = req_hdrs[expected_hdr].split(";")
param = param.strip()
self.assertTrue(param.startswith("swift_meta="))
actual_meta = json.loads(urllib.unquote_plus(param[len("swift_meta=") :]))
self.assertEqual(Crypto.cipher, actual_meta["cipher"])
meta_iv = base64.b64decode(actual_meta["iv"])
self.assertEqual(FAKE_IV, meta_iv)
self.assertEqual(base64.b64encode(encrypt(value, key, meta_iv)), enc_val)
# if there is any encrypted user metadata then this header should exist
self.assertIn("X-Object-Transient-Sysmeta-Crypto-Meta", req_hdrs)
common_meta = json.loads(urllib.unquote_plus(req_hdrs["X-Object-Transient-Sysmeta-Crypto-Meta"]))
self.assertDictEqual({"cipher": Crypto.cipher, "key_id": {"v": "fake", "path": "/a/c/fake"}}, common_meta)
def test_PUT_req(self):
body_key = os.urandom(32)
object_key = fetch_crypto_keys()["object"]
plaintext = "FAKE APP"
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
ciphertext_etag = md5hex(ciphertext)
env = {"REQUEST_METHOD": "PUT", CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {
"etag": plaintext_etag,
"content-type": "text/plain",
"content-length": str(len(plaintext)),
"x-object-meta-etag": "not to be confused with the Etag!",
"x-object-meta-test": "encrypt me",
"x-object-sysmeta-test": "do not encrypt me",
}
req = Request.blank("/v1/a/c/o", environ=env, body=plaintext, headers=hdrs)
self.app.register("PUT", "/v1/a/c/o", HTTPCreated, {})
with mock.patch(
"swift.common.middleware.crypto.crypto_utils." "Crypto.create_random_key", return_value=body_key
):
resp = req.get_response(self.encrypter)
self.assertEqual("201 Created", resp.status)
self.assertEqual(plaintext_etag, resp.headers["Etag"])
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual("PUT", self.app.calls[0][0])
req_hdrs = self.app.headers[0]
# verify body crypto meta
actual = req_hdrs["X-Object-Sysmeta-Crypto-Body-Meta"]
actual = json.loads(urllib.unquote_plus(actual))
self.assertEqual(Crypto().cipher, actual["cipher"])
self.assertEqual(FAKE_IV, base64.b64decode(actual["iv"]))
# verify wrapped body key
expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV)
self.assertEqual(expected_wrapped_key, base64.b64decode(actual["body_key"]["key"]))
self.assertEqual(FAKE_IV, base64.b64decode(actual["body_key"]["iv"]))
self.assertEqual(fetch_crypto_keys()["id"], actual["key_id"])
# verify etag
self.assertEqual(ciphertext_etag, req_hdrs["Etag"])
encrypted_etag, _junk, etag_meta = req_hdrs["X-Object-Sysmeta-Crypto-Etag"].partition("; swift_meta=")
# verify crypto_meta was appended to this etag
self.assertTrue(etag_meta)
actual_meta = json.loads(urllib.unquote_plus(etag_meta))
self.assertEqual(Crypto().cipher, actual_meta["cipher"])
# verify encrypted version of plaintext etag
actual = base64.b64decode(encrypted_etag)
etag_iv = base64.b64decode(actual_meta["iv"])
enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
self.assertEqual(enc_etag, actual)
# verify etag MAC for conditional requests
actual_hmac = base64.b64decode(req_hdrs["X-Object-Sysmeta-Crypto-Etag-Mac"])
self.assertEqual(actual_hmac, hmac.new(object_key, plaintext_etag, hashlib.sha256).digest())
# verify encrypted etag for container update
self.assertIn("X-Object-Sysmeta-Container-Update-Override-Etag", req_hdrs)
parts = req_hdrs["X-Object-Sysmeta-Container-Update-Override-Etag"].rsplit(";", 1)
self.assertEqual(2, len(parts))
# extract crypto_meta from end of etag for container update
param = parts[1].strip()
crypto_meta_tag = "swift_meta="
self.assertTrue(param.startswith(crypto_meta_tag), param)
actual_meta = json.loads(urllib.unquote_plus(param[len(crypto_meta_tag) :]))
self.assertEqual(Crypto().cipher, actual_meta["cipher"])
self.assertEqual(fetch_crypto_keys()["id"], actual_meta["key_id"])
cont_key = fetch_crypto_keys()["container"]
#.........这里部分代码省略.........
开发者ID:notmyname,项目名称:swift,代码行数:101,代码来源:test_encrypter.py
示例13: TestOioServerSideCopyMiddleware
class TestOioServerSideCopyMiddleware(TestServerSideCopyMiddleware):
def setUp(self):
self.app = FakeSwift()
self.ssc = copy.filter_factory({
'object_post_as_copy': 'yes',
})(self.app)
self.ssc.logger = self.app.logger
def tearDown(self):
# get_object_info() does not close response iterator,
# thus we have to disable the unclosed_requests test.
pass
def test_basic_put_with_x_copy_from(self):
self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk, {})
self.app.register('PUT', '/v1/a/c/o2', swob.HTTPCreated, {})
req = Request.blank('/v1/a/c/o2', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
status, headers, body = self.call_ssc(req)
self.assertEqual(status, '201 Created')
self.assertTrue(('X-Copied-From', 'c/o') in headers)
self.assertEqual(len(self.authorized), 1)
self.assertEqual('PUT', self.authorized[0].method)
self.assertEqual('/v1/a/c/o2', self.authorized[0].path)
self.assertEqual(self.app.swift_sources[0], 'SSC')
# For basic test cases, assert orig_req_method behavior
self.assertNotIn('swift.orig_req_method', req.environ)
def test_static_large_object_manifest(self):
self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk,
{'X-Static-Large-Object': 'True',
'Etag': 'should not be sent'})
self.app.register('GET', '/v1/a/c/o', swob.HTTPOk,
{'X-Static-Large-Object': 'True',
'Etag': 'should not be sent'}, 'passed')
self.app.register('PUT', '/v1/a/c/o2?multipart-manifest=put',
swob.HTTPCreated, {})
req = Request.blank('/v1/a/c/o2?multipart-manifest=get',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
status, headers, body = self.call_ssc(req)
self.assertEqual(status, '201 Created')
self.assertTrue(('X-Copied-From', 'c/o') in headers)
self.assertEqual(3, len(self.app.calls))
self.assertEqual('HEAD', self.app.calls[0][0])
self.assertEqual('GET', self.app.calls[1][0])
get_path, qs = self.app.calls[1][1].split('?')
params = urllib.parse.parse_qs(qs)
self.assertDictEqual(
{'format': ['raw'], 'multipart-manifest': ['get']}, params)
self.assertEqual(get_path, '/v1/a/c/o')
self.assertEqual(self.app.calls[2],
('PUT', '/v1/a/c/o2?multipart-manifest=put'))
req_headers = self.app.headers[2]
self.assertNotIn('X-Static-Large-Object', req_headers)
self.assertNotIn('Etag', req_headers)
self.assertEqual(len(self.authorized), 2)
self.assertEqual('GET', self.authorized[0].method)
self.assertEqual('/v1/a/c/o', self.authorized[0].path)
self.assertEqual('PUT', self.authorized[1].method)
self.assertEqual('/v1/a/c/o2', self.authorized[1].path)
def test_static_large_object(self):
# Compared to the original copy middleware, we do an extra HEAD request
self.app.register('HEAD', '/v1/a/c/o', swob.HTTPOk,
{'X-Static-Large-Object': 'True',
'Etag': 'should not be sent'}, 'passed')
self.app.register('GET', '/v1/a/c/o', swob.HTTPOk,
{'X-Static-Large-Object': 'True',
'Etag': 'should not be sent'}, 'passed')
self.app.register('PUT', '/v1/a/c/o2',
swob.HTTPCreated, {})
req = Request.blank('/v1/a/c/o2',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
status, headers, body = self.call_ssc(req)
self.assertEqual(status, '201 Created')
self.assertTrue(('X-Copied-From', 'c/o') in headers)
self.assertEqual(self.app.calls, [
('HEAD', '/v1/a/c/o'),
('GET', '/v1/a/c/o'),
('PUT', '/v1/a/c/o2')])
req_headers = self.app.headers[1]
self.assertNotIn('X-Static-Large-Object', req_headers)
self.assertNotIn('Etag', req_headers)
self.assertEqual(len(self.authorized), 2)
self.assertEqual('GET', self.authorized[0].method)
self.assertEqual('/v1/a/c/o', self.authorized[0].path)
self.assertEqual('PUT', self.authorized[1].method)
self.assertEqual('/v1/a/c/o2', self.authorized[1].path)
def test_basic_put_with_x_copy_from_across_container(self):
self.app.register('HEAD', '/v1/a/c1/o1', swob.HTTPOk, {})
self.app.register('PUT', '/v1/a/c2/o2', swob.HTTPCreated, {})
req = Request.blank('/v1/a/c2/o2', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
#.........这里部分代码省略.........
开发者ID:fvennetier,项目名称:oio-swift,代码行数:101,代码来源:test_copy.py
示例14: TestUntarMetadata
class TestUntarMetadata(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.bulk = bulk.filter_factory({})(self.app)
self.testdir = mkdtemp(suffix='tmp_test_bulk')
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def test_extract_metadata(self):
self.app.register('HEAD', '/v1/a/c?extract-archive=tar',
HTTPNoContent, {}, None)
self.app.register('PUT', '/v1/a/c/obj1?extract-archive=tar',
HTTPCreated, {}, None)
self.app.register('PUT', '/v1/a/c/obj2?extract-archive=tar',
HTTPCreated, {}, None)
# It's a real pain to instantiate TarInfo objects directly; they
# really want to come from a file on disk or a tarball. So, we write
# out some files and add pax headers to them as they get placed into
# the tarball.
with open(os.path.join(self.testdir, "obj1"), "w") as fh1:
fh1.write("obj1 contents\n")
with open(os.path.join(self.testdir, "obj2"), "w") as fh2:
fh2.write("obj2 contents\n")
tar_ball = StringIO()
tar_file = tarfile.TarFile.open(fileobj=tar_ball, mode="w",
format=tarfile.PAX_FORMAT)
# With GNU tar 1.27.1 or later (possibly 1.27 as well), a file with
# extended attribute user.thingy = dingy gets put into the tarfile
# with pax_headers containing key/value pair
# (SCHILY.xattr.user.thingy, dingy), both unicode strings (py2: type
# unicode, not type str).
#
# With BSD tar (libarchive), you get key/value pair
# (LIBARCHIVE.xattr.user.thingy, dingy), which strikes me as
# gratuitous incompatibility.
#
# Still, we'll support uploads with both. Just heap more code on the
# problem until you can forget it's under there.
with open(os.path.join(self.testdir, "obj1")) as fh1:
tar_info1 = tar_file.gettarinfo(fileobj=fh1,
arcname="obj1")
tar_info1.pax_headers[u'SCHILY.xattr.user.mime_type'] = \
u'application/food-diary'
tar_info1.pax_headers[u'SCHILY.xattr.user.meta.lunch'] = \
u'sopa de albóndigas'
tar_info1.pax_headers[
u'SCHILY.xattr.user.meta.afternoon-snack'] = \
u'gigantic bucket of coffee'
tar_file.addfile(tar_info1, fh1)
with open(os.path.join(self.testdir, "obj2")) as fh2:
tar_info2 = tar_file.gettarinfo(fileobj=fh2,
arcname="obj2")
tar_info2.pax_headers[
u'LIBARCHIVE.xattr.user.meta.muppet'] = u'bert'
tar_info2.pax_headers[
u'LIBARCHIVE.xattr.user.meta.cat'] = u'fluffy'
tar_info2.pax_headers[
u'LIBARCHIVE.xattr.user.notmeta'] = u'skipped'
tar_file.addfile(tar_info2, fh2)
tar_ball.seek(0)
req = Request.blank('/v1/a/c?extract-archive=tar')
req.environ['REQUEST_METHOD'] = 'PUT'
req.environ['wsgi.input'] = tar_ball
req.headers['transfer-encoding'] = 'chunked'
req.headers['accept'] = 'application/json;q=1.0'
resp = req.get_response(self.bulk)
self.assertEqual(resp.status_int, 200)
# sanity check to make sure the upload worked
upload_status = utils.json.loads(resp.body)
self.assertEqual(upload_status['Number Files Created'], 2)
put1_headers = HeaderKeyDict(self.app.calls_with_headers[1][2])
self.assertEqual(
put1_headers.get('Content-Type'),
'application/food-diary')
self.assertEqual(
put1_headers.get('X-Object-Meta-Lunch'),
'sopa de alb\xc3\xb3ndigas')
self.assertEqual(
put1_headers.get('X-Object-Meta-Afternoon-Snack'),
'gigantic bucket of coffee')
put2_headers = HeaderKeyDict(self.app.calls_with_headers[2][2])
self.assertEqual(put2_headers.get('X-Object-Meta-Muppet'), 'bert')
self.assertEqual(put2_headers.get('X-Object-Meta-Cat'), 'fluffy')
self.assertEqual(put2_headers.get('Content-Type'), None)
self.assertEqual(put2_headers.get('X-Object-Meta-Blah'), None)
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:96,代码来源:test_bulk.py
示例15: setUp
def setUp(self):
self.app = FakeSwift()
self.bulk = bulk.filter_factory({})(self.app)
self.testdir = mkdtemp(suffix='tmp_test_bulk')
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:4,代码来源:test_bulk.py
示例16: ContainerQuotaCopyingTestCases
class ContainerQuotaCopyingTestCases(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
self.cq_filter = container_quotas.filter_factory({})(self.app)
self.copy_filter = copy.filter_factory({})(self.cq_filter)
def test_exceed_bytes_quota_copy_verb(self):
cache = FakeCache({'bytes': 0, 'meta': {'quota-bytes': '2'}})
self.app.register('GET', '/v1/a/c2/o2', HTTPOk,
{'Content-Length': '10'}, 'passed')
req = Request.blank('/v1/a/c2/o2',
environ={'REQUEST_METHOD': 'COPY',
'swift.cache': cache},
headers={'Destination': '/c/o'})
res = req.get_response(self.copy_filter)
self.assertEqual(res.status_int, 413)
self.assertEqual(res.body, 'Upload exceeds quota.')
def test_not_exceed_bytes_quota_copy_verb(self):
self.app.register('GET', '/v1/a/c2/o2', HTTPOk,
{'Content-Length': '10'}, 'passed')
self.app.register(
'PUT', '/v1/a/c/o', HTTPOk, {}, 'passed')
cache = FakeCache({'bytes': 0, 'meta': {'quota-bytes': '100'}})
req = Request.blank('/v1/a/c2/o2',
environ={'REQUEST_METHOD': 'COPY',
'swift.cache': cache},
headers={'Destination': '/c/o'})
res = req.get_response(self.copy_filter)
self.assertEqual(res.status_int, 200)
def test_exceed_counts_quota_copy_verb(self):
self.app.register('GET', '/v1/a/c2/o2', HTTPOk, {}, 'passed')
cache = FakeCache({'object_count': 1, 'meta': {'quota-count': '1'}})
req = Request.blank('/v1/a/c2/o2',
environ={'REQUEST_METHOD': 'COPY',
'swift.cache': cache},
headers={'Destination': '/c/o'})
res = req.get_response(self.copy_filter)
self.assertEqual(res.status_int, 413)
self.assertEqual(res.body, 'Upload exceeds quota.')
def test_exceed_counts_quota_copy_cross_account_verb(self):
self.app.register('GET', '/v1/a/c2/o2', HTTPOk, {}, 'passed')
a_c_cache = {'storage_policy': '0', 'meta': {'quota-count': '2'},
'status': 200, 'object_count': 1}
a2_c_cache = {'storage_po
|
请发表评论