本文整理汇总了Python中test.unit.FakeLogger类的典型用法代码示例。如果您正苦于以下问题:Python FakeLogger类的具体用法?Python FakeLogger怎么用?Python FakeLogger使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了FakeLogger类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_two_realms_and_change_a_default
def test_two_realms_and_change_a_default(self):
fname = 'container-sync-realms.conf'
fcontents = '''
[DEFAULT]
mtime_check_interval = 60
[US]
key = 9ff3b71c849749dbaec4ccdd3cbab62b
cluster_dfw1 = http://dfw1.host/v1/
[UK]
key = e9569809dc8b4951accc1487aa788012
key2 = f6351bd1cc36413baa43f7ba1b45e51d
cluster_lon3 = http://lon3.host/v1/
'''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 60)
self.assertEqual(sorted(csr.realms()), ['UK', 'US'])
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
self.assertEqual(csr.key2('US'), None)
self.assertEqual(csr.clusters('US'), ['DFW1'])
self.assertEqual(
csr.endpoint('US', 'DFW1'), 'http://dfw1.host/v1/')
self.assertEqual(csr.key('UK'), 'e9569809dc8b4951accc1487aa788012')
self.assertEqual(
csr.key2('UK'), 'f6351bd1cc36413baa43f7ba1b45e51d')
self.assertEqual(csr.clusters('UK'), ['LON3'])
self.assertEqual(
csr.endpoint('UK', 'LON3'), 'http://lon3.host/v1/')
开发者ID:Joyezhu,项目名称:swift,代码行数:33,代码来源:test_container_sync_realms.py
示例2: test_empty
def test_empty(self):
fname = 'container-sync-realms.conf'
fcontents = ''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:10,代码来源:test_container_sync_realms.py
示例3: test_no_file_there
def test_no_file_there(self):
unique = uuid.uuid4().hex
logger = FakeLogger()
csr = ContainerSyncRealms(unique, logger)
self.assertEqual(
logger.all_log_lines(),
{'debug': [
"Could not load '%s': [Errno 2] No such file or directory: "
"'%s'" % (unique, unique)]})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:11,代码来源:test_container_sync_realms.py
示例4: test_object_run_logging
def test_object_run_logging(self):
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger, self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=["sda"])
log_lines = logger.get_lines_for_level("info")
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index("ALL - parallel, sda"))
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger, self.rcache, self.devices, zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=["sda"])
log_lines = logger.get_lines_for_level("info")
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index("ZBF - sda"))
开发者ID:renanalan,项目名称:swift,代码行数:14,代码来源:test_auditor.py
示例5: test_error_parsing
def test_error_parsing(self):
fname = 'container-sync-realms.conf'
fcontents = 'invalid'
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(
logger.all_log_lines(),
{'error': [
"Could not load '%s': File contains no section headers.\n"
"file: %s, line: 1\n"
"'invalid'" % (fpath, fpath)]})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:15,代码来源:test_container_sync_realms.py
示例6: test_sweep_logs_multiple_policies
def test_sweep_logs_multiple_policies(self):
for policy in _mocked_policies:
asyncdir = os.path.join(self.sda1, get_async_dir(policy.idx))
prefix_dir = os.path.join(asyncdir, 'abc')
mkpath(prefix_dir)
for o, t in [('abc', 123), ('def', 234), ('ghi', 345)]:
ohash = hash_path('account', 'container%d' % policy.idx, o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
write_pickle({}, o_path)
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
logger = FakeLogger()
ou = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'report_interval': '10.0',
'node_timeout': '5'}, logger=logger)
now = [time()]
def mock_time():
rv = now[0]
now[0] += 0.01
return rv
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time)):
ou.object_sweep(self.sda1)
completion_lines = [l for l in logger.get_lines_for_level('info')
if "sweep complete" in l]
self.assertEqual(len(completion_lines), 1)
self.assertIn("sweep complete", completion_lines[0])
self.assertIn(
"6 successes, 0 failures, 0 quarantines, 6 unlinks, 0 errors, "
"0 redirects",
completion_lines[0])
开发者ID:jgmerritt,项目名称:swift,代码行数:48,代码来源:test_updater.py
示例7: test_empty_realm
def test_empty_realm(self):
fname = 'container-sync-realms.conf'
fcontents = '''
[US]
'''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), ['US'])
self.assertEqual(csr.key('US'), None)
self.assertEqual(csr.key2('US'), None)
self.assertEqual(csr.clusters('US'), [])
self.assertEqual(csr.endpoint('US', 'JUST_TESTING'), None)
开发者ID:Joyezhu,项目名称:swift,代码行数:16,代码来源:test_container_sync_realms.py
示例8: test_bad_mtime_check_interval
def test_bad_mtime_check_interval(self):
fname = 'container-sync-realms.conf'
fcontents = '''
[DEFAULT]
mtime_check_interval = invalid
'''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(
logger.all_log_lines(),
{'error': [
"Error in '%s' with mtime_check_interval: invalid literal "
"for int() with base 10: 'invalid'" % fpath]})
self.assertEqual(csr.mtime_check_interval, 300)
开发者ID:Joyezhu,项目名称:swift,代码行数:16,代码来源:test_container_sync_realms.py
示例9: setUp
def setUp(self):
class FakeFilter(object):
app = None
crypto = Crypto({})
self.fake_logger = FakeLogger()
self.crypto_context = CryptoWSGIContext(
FakeFilter(), 'object', self.fake_logger)
开发者ID:mahak,项目名称:swift,代码行数:8,代码来源:test_crypto_utils.py
示例10: test_os_error
def test_os_error(self):
fname = 'container-sync-realms.conf'
fcontents = ''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
os.chmod(tempdir, 0)
csr = ContainerSyncRealms(fpath, logger)
try:
self.assertEqual(
logger.all_log_lines(),
{'error': [
"Could not load '%s': [Errno 13] Permission denied: "
"'%s'" % (fpath, fpath)]})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
finally:
os.chmod(tempdir, 0700)
开发者ID:Joyezhu,项目名称:swift,代码行数:18,代码来源:test_container_sync_realms.py
示例11: test_one_realm
def test_one_realm(self):
fname = 'container-sync-realms.conf'
fcontents = '''
[US]
key = 9ff3b71c849749dbaec4ccdd3cbab62b
cluster_dfw1 = http://dfw1.host/v1/
'''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), ['US'])
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
self.assertEqual(csr.key2('US'), None)
self.assertEqual(csr.clusters('US'), ['DFW1'])
self.assertEqual(
csr.endpoint('US', 'DFW1'), 'http://dfw1.host/v1/')
开发者ID:Joyezhu,项目名称:swift,代码行数:19,代码来源:test_container_sync_realms.py
示例12: setUp
def setUp(self):
global not_sleep
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda *a, **kw: None
internal_client.sleep = not_sleep
self.rcache = mkdtemp()
self.logger = FakeLogger()
开发者ID:701,项目名称:swift,代码行数:11,代码来源:test_expirer.py
示例13: test_os_error
def test_os_error(self):
fname = 'container-sync-realms.conf'
fcontents = ''
with temptree([fname], [fcontents]) as tempdir:
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
def _mock_getmtime(path):
raise OSError(errno.EACCES,
os.strerror(errno.EACCES) +
": '%s'" % (fpath))
with patch('os.path.getmtime', _mock_getmtime):
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(
logger.all_log_lines(),
{'error': [
"Could not load '%s': [Errno 13] Permission denied: "
"'%s'" % (fpath, fpath)]})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
开发者ID:AfonsoFGarcia,项目名称:swift,代码行数:21,代码来源:test_container_sync_realms.py
示例14: setUp
def setUp(self):
skip_if_no_xattrs()
self.logger = FakeLogger()
self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'node')
shutil.rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
os.mkdir(self.devices)
self.rb = ring.RingBuilder(8, 6.0, 1)
for i in range(6):
ip = "127.0.0.%s" % i
self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': ip, 'port': 10000, 'device': 'sda1'})
self.rb.rebalance(seed=1)
self.existing_device = 'sda1'
os.mkdir(os.path.join(self.devices, self.existing_device))
self.objects = os.path.join(self.devices, self.existing_device,
'objects')
os.mkdir(self.objects)
self._hash = utils.hash_path('a/c/o')
digest = binascii.unhexlify(self._hash)
part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
self.objdir = os.path.join(
self.objects, str(part), self._hash[-3:], self._hash)
os.makedirs(self.objdir)
self.object_fname = "1278553064.00000.data"
self.objname = os.path.join(self.objdir, self.object_fname)
with open(self.objname, "wb") as dummy:
dummy.write(b"Hello World!")
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
test_policies = [StoragePolicy(0, 'platin', True)]
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
self.expected_dir = os.path.join(
self.objects, str(self.next_part), self._hash[-3:], self._hash)
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
开发者ID:matthewoliver,项目名称:swift,代码行数:41,代码来源:test_relinker.py
示例15: TestObjectExpirer
class TestObjectExpirer(TestCase):
maxDiff = None
def setUp(self):
global not_sleep
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda *a, **kw: None
internal_client.sleep = not_sleep
self.rcache = mkdtemp()
self.logger = FakeLogger()
def teardown(self):
rmtree(self.rcache)
internal_client.sleep = self.old_sleep
internal_client.loadapp = self.loadapp
def test_get_process_values_from_kwargs(self):
x = expirer.ObjectExpirer({})
vals = {"processes": 5, "process": 1}
self.assertEqual((5, 1), x.get_process_values(vals))
def test_get_process_values_from_config(self):
vals = {"processes": 5, "process": 1}
x = expirer.ObjectExpirer(vals)
self.assertEqual((5, 1), x.get_process_values({}))
def test_get_process_values_negative_process(self):
vals = {"processes": 5, "process": -1}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_negative_processes(self):
vals = {"processes": -5, "process": 1}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_process_greater_than_processes(self):
vals = {"processes": 5, "process": 7}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_init_concurrency_too_small(self):
conf = {"concurrency": 0}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
conf = {"concurrency": -1}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
def test_process_based_concurrency(self):
class ObjectExpirer(expirer.ObjectExpirer):
def __init__(self, conf):
super(ObjectExpirer, self).__init__(conf)
self.processes = 3
self.deleted_objects = {}
def delete_object(self, actual_obj, timestamp, container, obj):
if container not in self.deleted_objects:
self.deleted_objects[container] = set()
self.deleted_objects[container].add(obj)
class InternalClient(object):
def __init__(self, containers):
self.containers = containers
def get_account_info(self, *a, **kw):
return len(self.containers.keys()), sum([len(self.containers[x]) for x in self.containers])
def iter_containers(self, *a, **kw):
return [{"name": x} for x in self.containers.keys()]
def iter_objects(self, account, container):
return [{"name": x} for x in self.containers[container]]
def delete_container(*a, **kw):
pass
ukey = u"3"
containers = {
0: set("1-one 2-two 3-three".split()),
1: set("2-two 3-three 4-four".split()),
2: set("5-five 6-six".split()),
ukey: set(u"7-seven\u2661".split()),
}
x = ObjectExpirer({})
x.swift = InternalClient(containers)
#.........这里部分代码省略.........
开发者ID:AsherBond,项目名称:swift,代码行数:101,代码来源:test_expirer.py
示例16: TestCryptoWsgiContext
class TestCryptoWsgiContext(unittest.TestCase):
def setUp(self):
class FakeFilter(object):
app = None
crypto = Crypto({})
self.fake_logger = FakeLogger()
self.crypto_context = CryptoWSGIContext(
FakeFilter(), 'object', self.fake_logger)
def test_get_keys(self):
# ok
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
keys = self.crypto_context.get_keys(env)
self.assertDictEqual(fetch_crypto_keys(), keys)
# only default required keys are checked
subset_keys = {'object': fetch_crypto_keys()['object']}
env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
keys = self.crypto_context.get_keys(env)
self.assertDictEqual(subset_keys, keys)
# only specified required keys are checked
subset_keys = {'container': fetch_crypto_keys()['container']}
env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
keys = self.crypto_context.get_keys(env, required=['container'])
self.assertDictEqual(subset_keys, keys)
subset_keys = {'object': fetch_crypto_keys()['object'],
'container': fetch_crypto_keys()['container']}
env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
keys = self.crypto_context.get_keys(
env, required=['object', 'container'])
self.assertDictEqual(subset_keys, keys)
def test_get_keys_with_crypto_meta(self):
# verify that key_id from crypto_meta is passed to fetch_crypto_keys
keys = fetch_crypto_keys()
mock_fetch_crypto_keys = mock.MagicMock(return_value=keys)
env = {CRYPTO_KEY_CALLBACK: mock_fetch_crypto_keys}
key_id = {'secret_id': '123'}
keys = self.crypto_context.get_keys(env, key_id=key_id)
self.assertDictEqual(fetch_crypto_keys(), keys)
mock_fetch_crypto_keys.assert_called_with(key_id={'secret_id': '123'})
# but it's ok for there to be no crypto_meta
keys = self.crypto_context.get_keys(env, key_id={})
self.assertDictEqual(fetch_crypto_keys(), keys)
mock_fetch_crypto_keys.assert_called_with(key_id={})
keys = self.crypto_context.get_keys(env)
self.assertDictEqual(fetch_crypto_keys(), keys)
mock_fetch_crypto_keys.assert_called_with(key_id=None)
def test_get_keys_missing_callback(self):
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({})
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn('missing callback',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_callback_exception(self):
def callback(*args, **kwargs):
raise Exception('boom')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn('from callback: boom',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys.pop('object')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_object_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys.pop('object')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_container_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
#.........这里部分代码省略.........
开发者ID:mahak,项目名称:swift,代码行数:101,代码来源:test_crypto_utils.py
示例17: TestCryptoWsgiContext
class TestCryptoWsgiContext(unittest.TestCase):
def setUp(self):
class FakeFilter(object):
app = None
crypto = Crypto({})
self.fake_logger = FakeLogger()
self.crypto_context = CryptoWSGIContext(
FakeFilter(), 'object', self.fake_logger)
def test_get_keys(self):
# ok
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
keys = self.crypto_context.get_keys(env)
self.assertDictEqual(fetch_crypto_keys(), keys)
# only default required keys are checked
subset_keys = {'object': fetch_crypto_keys()['object']}
env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
keys = self.crypto_context.get_keys(env)
self.assertDictEqual(subset_keys, keys)
# only specified required keys are checked
subset_keys = {'container': fetch_crypto_keys()['container']}
env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
keys = self.crypto_context.get_keys(env, required=['container'])
self.assertDictEqual(subset_keys, keys)
subset_keys = {'object': fetch_crypto_keys()['object'],
'container': fetch_crypto_keys()['container']}
env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
keys = self.crypto_context.get_keys(
env, required=['object', 'container'])
self.assertDictEqual(subset_keys, keys)
def test_get_keys_missing_callback(self):
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('missing callback',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_get_keys_callback_exception(self):
def callback():
raise Exception('boom')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('from callback: boom',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_get_keys_missing_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys.pop('object')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda: bad_keys})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_get_keys_missing_object_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys.pop('object')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_get_keys_missing_container_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys.pop('container')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn("Missing key for 'container'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_bad_object_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys['object'] = 'the minor key'
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda: bad_keys})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn("Bad key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
def test_bad_container_key_for_default_required_list(self):
#.........这里部分代码省略.........
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:101,代码来源:test_crypto_utils.py
示例18: test_sweep_logs
def test_sweep_logs(self):
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
prefix_dir = os.path.join(asyncdir, 'abc')
mkpath(prefix_dir)
for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
('jkl', 456), ('mno', 567)]:
ohash = hash_path('account', 'container', o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
write_pickle({}, o_path)
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
logger = FakeLogger()
ou = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'report_interval': '10.0',
'node_timeout': '5'}, logger=logger)
now = [time()]
def mock_time_function():
rv = now[0]
now[0] += 5
return rv
# With 10s between updates, time() advancing 5s every time we look,
# and 5 async_pendings on disk, we should get at least two progress
# lines.
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)), \
mock.patch.object(object_updater, 'ContextPool', MockPool):
ou.object_sweep(self.sda1)
info_lines = logger.get_lines_for_level('info')
self.assertEqual(4, len(info_lines))
self.assertIn("sweep starting", info_lines[0])
self.assertIn(self.sda1, info_lines[0])
self.assertIn("sweep progress", info_lines[1])
# the space ensures it's a positive number
self.assertIn(
"2 successes, 0 failures, 0 quarantines, 2 unlinks, 0 errors, "
"0 redirects",
info_lines[1])
self.assertIn(self.sda1, info_lines[1])
self.assertIn("sweep progress", info_lines[2])
self.assertIn(
"4 successes, 0 failures, 0 quarantines, 4 unlinks, 0 errors, "
"0 redirects",
info_lines[2])
self.assertIn(self.sda1, info_lines[2])
self.assertIn("sweep complete", info_lines[3])
self.assertIn(
"5 successes, 0 failures, 0 quarantines, 5 unlinks, 0 errors, "
"0 redirects",
info_lines[3])
self.assertIn(self.sda1, info_lines[3])
开发者ID:jgmerritt,项目名称:swift,代码行数:69,代码来源:test_updater.py
示例19: TestObjectExpirer
class TestObjectExpirer(TestCase):
maxDiff = None
def setUp(self):
global not_sleep
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda *a, **kw: None
internal_client.sleep = not_sleep
self.rcache = mkdtemp()
self.logger = FakeLogger()
def teardown(self):
rmtree(self.rcache)
internal_client.sleep = self.old_sleep
internal_client.loadapp = self.loadapp
def test_get_process_values_from_kwargs(self):
x = expirer.ObjectExpirer({})
vals = {
'processes': 5,
'process': 1,
}
self.assertEqual((5, 1), x.get_process_values(vals))
def test_get_process_values_from_config(self):
vals = {
'processes': 5,
'process': 1,
}
x = expirer.ObjectExpirer(vals)
self.assertEqual((5, 1), x.get_process_values({}))
def test_get_process_values_negative_process(self):
vals = {
'processes': 5,
'process': -1,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_negative_processes(self):
vals = {
'processes': -5,
'process': 1,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_get_process_values_process_greater_than_processes(self):
vals = {
'processes': 5,
'process': 7,
}
# from config
x = expirer.ObjectExpirer(vals)
self.assertRaises(ValueError, x.get_process_values, {})
# from kwargs
x = expirer.ObjectExpirer({})
self.assertRaises(ValueError, x.get_process_values, vals)
def test_init_concurrency_too_small(self):
conf = {
'concurrency': 0,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
conf = {
'concurrency': -1,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
def test_process_based_concurrency(self):
class ObjectExpirer(expirer.ObjectExpirer):
def __init__(self, conf):
super(ObjectExpirer, self).__init__(conf)
self.processes = 3
self.deleted_objects = {}
def delete_object(self, actual_obj, timestamp, container, obj):
if container not in self.deleted_objects:
self.deleted_objects[container] = set()
self.deleted_objects[container].add(obj)
class InternalClient(object):
def __init__(self, containers):
self.containers = containers
#.........这里部分代码省略.........
开发者ID:701,项目名称:swift,代码行数:101,代码来源:test_expirer.py
示例20: TestRelinker
class TestRelinker(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
self.logger = FakeLogger()
self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'node')
shutil.rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
os.mkdir(self.devices)
self.rb = ring.RingBuilder(8, 6.0, 1)
for i in range(6):
ip = "127.0.0.%s" % i
self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': ip, 'port': 10000, 'device': 'sda1'})
self.rb.rebalance(seed=1)
self.existing_device = 'sda1'
os.mkdir(os.path.join(self.devices, self.existing_device))
self.objects = os.path.join(self.devices, self.existing_device,
'objects')
os.mkdir(self.objects)
self._hash = utils.hash_path('a/c/o')
digest = binascii.unhexlify(self._hash)
part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
self.objdir = os.path.join(
self.objects, str(part), self._hash[-3:], self._hash)
os.makedirs(self.objdir)
self.object_fname = "1278553064.00000.data"
self.objname = os.path.join(self.objdir, self.object_fname)
with open(self.objname, "wb") as dummy:
dummy.write(b"Hello World!")
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
test_policies = [StoragePolicy(0, 'platin', True)]
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
self.expected_dir = os.path.join(
self.objects, str(self.next_part), self._hash[-3:], self._hash)
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
def _save_ring(self):
rd = self.rb.get_ring()
for policy in POLICIES:
rd.save(os.path.join(
self.testdir, '%s.ring.gz' % policy.ring_name))
# Enforce ring reloading in relinker
policy.object_ring = None
def tearDown(self):
shutil.rmtree(self.testdir, ignore_errors=1)
storage_policy.reload_storage_policies()
def test_relink(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
relinker.relink(self.testdir, self.devices, True)
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
def _common_test_cleanup(self, relink=True):
# Create a ring that has prev_part_power set
self.rb.prepare_increase_partition_power()
self.rb.increase_partition_power()
self._save_ring()
os.makedirs(self.expected_dir)
if relink:
# Create a hardlink to the original object name. This is expected
# after a normal relinker run
os.link(os.path.join(self.objdir, self.object_fname),
self.expected_file)
def test_cleanup(self):
self._common_test_cleanup()
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))
# Old objectname should be removed, new should still exist
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_not_yet_relinked(self):
self._common_test_cleanup(relink=False)
self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))
self.assertTrue(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_deleted(self):
self._common_test_cleanup()
#.........这里部分代码省略.........
开发者ID:matthewoliver,项目名称:swift,代码行数:101,代码来源:test_relinker.py
注:本文中的test.unit.FakeLogger类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论