• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python unit.FakeLogger类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.unit.FakeLogger的典型用法代码示例。如果您正苦于以下问题:Python FakeLogger类的具体用法?Python FakeLogger怎么用?Python FakeLogger使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FakeLogger类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_two_realms_and_change_a_default

    def test_two_realms_and_change_a_default(self):
        fname = 'container-sync-realms.conf'
        fcontents = '''
[DEFAULT]
mtime_check_interval = 60

[US]
key = 9ff3b71c849749dbaec4ccdd3cbab62b
cluster_dfw1 = http://dfw1.host/v1/

[UK]
key = e9569809dc8b4951accc1487aa788012
key2 = f6351bd1cc36413baa43f7ba1b45e51d
cluster_lon3 = http://lon3.host/v1/
'''
        with temptree([fname], [fcontents]) as tempdir:
            logger = FakeLogger()
            fpath = os.path.join(tempdir, fname)
            csr = ContainerSyncRealms(fpath, logger)
            self.assertEqual(logger.all_log_lines(), {})
            self.assertEqual(csr.mtime_check_interval, 60)
            self.assertEqual(sorted(csr.realms()), ['UK', 'US'])
            self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
            self.assertEqual(csr.key2('US'), None)
            self.assertEqual(csr.clusters('US'), ['DFW1'])
            self.assertEqual(
                csr.endpoint('US', 'DFW1'), 'http://dfw1.host/v1/')
            self.assertEqual(csr.key('UK'), 'e9569809dc8b4951accc1487aa788012')
            self.assertEqual(
                csr.key2('UK'), 'f6351bd1cc36413baa43f7ba1b45e51d')
            self.assertEqual(csr.clusters('UK'), ['LON3'])
            self.assertEqual(
                csr.endpoint('UK', 'LON3'), 'http://lon3.host/v1/')
开发者ID:Joyezhu,项目名称:swift,代码行数:33,代码来源:test_container_sync_realms.py


示例2: test_empty

 def test_empty(self):
     fname = 'container-sync-realms.conf'
     fcontents = ''
     with temptree([fname], [fcontents]) as tempdir:
         logger = FakeLogger()
         fpath = os.path.join(tempdir, fname)
         csr = ContainerSyncRealms(fpath, logger)
         self.assertEqual(logger.all_log_lines(), {})
         self.assertEqual(csr.mtime_check_interval, 300)
         self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:10,代码来源:test_container_sync_realms.py


示例3: test_no_file_there

 def test_no_file_there(self):
     unique = uuid.uuid4().hex
     logger = FakeLogger()
     csr = ContainerSyncRealms(unique, logger)
     self.assertEqual(
         logger.all_log_lines(),
         {'debug': [
             "Could not load '%s': [Errno 2] No such file or directory: "
             "'%s'" % (unique, unique)]})
     self.assertEqual(csr.mtime_check_interval, 300)
     self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:11,代码来源:test_container_sync_realms.py


示例4: test_object_run_logging

    def test_object_run_logging(self):
        logger = FakeLogger()
        auditor_worker = auditor.AuditorWorker(self.conf, logger, self.rcache, self.devices)
        auditor_worker.audit_all_objects(device_dirs=["sda"])
        log_lines = logger.get_lines_for_level("info")
        self.assertTrue(len(log_lines) > 0)
        self.assertTrue(log_lines[0].index("ALL - parallel, sda"))

        logger = FakeLogger()
        auditor_worker = auditor.AuditorWorker(self.conf, logger, self.rcache, self.devices, zero_byte_only_at_fps=50)
        auditor_worker.audit_all_objects(device_dirs=["sda"])
        log_lines = logger.get_lines_for_level("info")
        self.assertTrue(len(log_lines) > 0)
        self.assertTrue(log_lines[0].index("ZBF - sda"))
开发者ID:renanalan,项目名称:swift,代码行数:14,代码来源:test_auditor.py


示例5: test_error_parsing

 def test_error_parsing(self):
     fname = 'container-sync-realms.conf'
     fcontents = 'invalid'
     with temptree([fname], [fcontents]) as tempdir:
         logger = FakeLogger()
         fpath = os.path.join(tempdir, fname)
         csr = ContainerSyncRealms(fpath, logger)
         self.assertEqual(
             logger.all_log_lines(),
             {'error': [
                 "Could not load '%s': File contains no section headers.\n"
                 "file: %s, line: 1\n"
                 "'invalid'" % (fpath, fpath)]})
         self.assertEqual(csr.mtime_check_interval, 300)
         self.assertEqual(csr.realms(), [])
开发者ID:Joyezhu,项目名称:swift,代码行数:15,代码来源:test_container_sync_realms.py


示例6: test_sweep_logs_multiple_policies

    def test_sweep_logs_multiple_policies(self):
        for policy in _mocked_policies:
            asyncdir = os.path.join(self.sda1, get_async_dir(policy.idx))
            prefix_dir = os.path.join(asyncdir, 'abc')
            mkpath(prefix_dir)

            for o, t in [('abc', 123), ('def', 234), ('ghi', 345)]:
                ohash = hash_path('account', 'container%d' % policy.idx, o)
                o_path = os.path.join(prefix_dir, ohash + '-' +
                                      normalize_timestamp(t))
                write_pickle({}, o_path)

        class MockObjectUpdater(object_updater.ObjectUpdater):
            def process_object_update(self, update_path, device, policy):
                os.unlink(update_path)
                self.stats.successes += 1
                self.stats.unlinks += 1

        logger = FakeLogger()
        ou = MockObjectUpdater({
            'devices': self.devices_dir,
            'mount_check': 'false',
            'swift_dir': self.testdir,
            'interval': '1',
            'concurrency': '1',
            'report_interval': '10.0',
            'node_timeout': '5'}, logger=logger)

        now = [time()]

        def mock_time():
            rv = now[0]
            now[0] += 0.01
            return rv

        with mock.patch('swift.obj.updater.time',
                        mock.MagicMock(time=mock_time)):
            ou.object_sweep(self.sda1)

        completion_lines = [l for l in logger.get_lines_for_level('info')
                            if "sweep complete" in l]

        self.assertEqual(len(completion_lines), 1)
        self.assertIn("sweep complete", completion_lines[0])
        self.assertIn(
            "6 successes, 0 failures, 0 quarantines, 6 unlinks, 0 errors, "
            "0 redirects",
            completion_lines[0])
开发者ID:jgmerritt,项目名称:swift,代码行数:48,代码来源:test_updater.py


示例7: test_empty_realm

    def test_empty_realm(self):
        fname = 'container-sync-realms.conf'
        fcontents = '''
[US]
'''
        with temptree([fname], [fcontents]) as tempdir:
            logger = FakeLogger()
            fpath = os.path.join(tempdir, fname)
            csr = ContainerSyncRealms(fpath, logger)
            self.assertEqual(logger.all_log_lines(), {})
            self.assertEqual(csr.mtime_check_interval, 300)
            self.assertEqual(csr.realms(), ['US'])
            self.assertEqual(csr.key('US'), None)
            self.assertEqual(csr.key2('US'), None)
            self.assertEqual(csr.clusters('US'), [])
            self.assertEqual(csr.endpoint('US', 'JUST_TESTING'), None)
开发者ID:Joyezhu,项目名称:swift,代码行数:16,代码来源:test_container_sync_realms.py


示例8: test_bad_mtime_check_interval

    def test_bad_mtime_check_interval(self):
        fname = 'container-sync-realms.conf'
        fcontents = '''
[DEFAULT]
mtime_check_interval = invalid
'''
        with temptree([fname], [fcontents]) as tempdir:
            logger = FakeLogger()
            fpath = os.path.join(tempdir, fname)
            csr = ContainerSyncRealms(fpath, logger)
            self.assertEqual(
                logger.all_log_lines(),
                {'error': [
                    "Error in '%s' with mtime_check_interval: invalid literal "
                    "for int() with base 10: 'invalid'" % fpath]})
            self.assertEqual(csr.mtime_check_interval, 300)
开发者ID:Joyezhu,项目名称:swift,代码行数:16,代码来源:test_container_sync_realms.py


示例9: setUp

    def setUp(self):
        class FakeFilter(object):
            app = None
            crypto = Crypto({})

        self.fake_logger = FakeLogger()
        self.crypto_context = CryptoWSGIContext(
            FakeFilter(), 'object', self.fake_logger)
开发者ID:mahak,项目名称:swift,代码行数:8,代码来源:test_crypto_utils.py


示例10: test_os_error

 def test_os_error(self):
     fname = 'container-sync-realms.conf'
     fcontents = ''
     with temptree([fname], [fcontents]) as tempdir:
         logger = FakeLogger()
         fpath = os.path.join(tempdir, fname)
         os.chmod(tempdir, 0)
         csr = ContainerSyncRealms(fpath, logger)
         try:
             self.assertEqual(
                 logger.all_log_lines(),
                 {'error': [
                     "Could not load '%s': [Errno 13] Permission denied: "
                     "'%s'" % (fpath, fpath)]})
             self.assertEqual(csr.mtime_check_interval, 300)
             self.assertEqual(csr.realms(), [])
         finally:
             os.chmod(tempdir, 0700)
开发者ID:Joyezhu,项目名称:swift,代码行数:18,代码来源:test_container_sync_realms.py


示例11: test_one_realm

    def test_one_realm(self):
        fname = 'container-sync-realms.conf'
        fcontents = '''
[US]
key = 9ff3b71c849749dbaec4ccdd3cbab62b
cluster_dfw1 = http://dfw1.host/v1/
'''
        with temptree([fname], [fcontents]) as tempdir:
            logger = FakeLogger()
            fpath = os.path.join(tempdir, fname)
            csr = ContainerSyncRealms(fpath, logger)
            self.assertEqual(logger.all_log_lines(), {})
            self.assertEqual(csr.mtime_check_interval, 300)
            self.assertEqual(csr.realms(), ['US'])
            self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
            self.assertEqual(csr.key2('US'), None)
            self.assertEqual(csr.clusters('US'), ['DFW1'])
            self.assertEqual(
                csr.endpoint('US', 'DFW1'), 'http://dfw1.host/v1/')
开发者ID:Joyezhu,项目名称:swift,代码行数:19,代码来源:test_container_sync_realms.py


示例12: setUp

    def setUp(self):
        global not_sleep

        self.old_loadapp = internal_client.loadapp
        self.old_sleep = internal_client.sleep

        internal_client.loadapp = lambda *a, **kw: None
        internal_client.sleep = not_sleep

        self.rcache = mkdtemp()
        self.logger = FakeLogger()
开发者ID:701,项目名称:swift,代码行数:11,代码来源:test_expirer.py


示例13: test_os_error

    def test_os_error(self):
        fname = 'container-sync-realms.conf'
        fcontents = ''
        with temptree([fname], [fcontents]) as tempdir:
            logger = FakeLogger()
            fpath = os.path.join(tempdir, fname)

            def _mock_getmtime(path):
                raise OSError(errno.EACCES,
                              os.strerror(errno.EACCES) +
                              ": '%s'" % (fpath))
            with patch('os.path.getmtime', _mock_getmtime):
                csr = ContainerSyncRealms(fpath, logger)

            self.assertEqual(
                logger.all_log_lines(),
                {'error': [
                    "Could not load '%s': [Errno 13] Permission denied: "
                    "'%s'" % (fpath, fpath)]})
            self.assertEqual(csr.mtime_check_interval, 300)
            self.assertEqual(csr.realms(), [])
开发者ID:AfonsoFGarcia,项目名称:swift,代码行数:21,代码来源:test_container_sync_realms.py


示例14: setUp

    def setUp(self):
        skip_if_no_xattrs()
        self.logger = FakeLogger()
        self.testdir = tempfile.mkdtemp()
        self.devices = os.path.join(self.testdir, 'node')
        shutil.rmtree(self.testdir, ignore_errors=1)
        os.mkdir(self.testdir)
        os.mkdir(self.devices)

        self.rb = ring.RingBuilder(8, 6.0, 1)

        for i in range(6):
            ip = "127.0.0.%s" % i
            self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
                             'ip': ip, 'port': 10000, 'device': 'sda1'})
        self.rb.rebalance(seed=1)

        self.existing_device = 'sda1'
        os.mkdir(os.path.join(self.devices, self.existing_device))
        self.objects = os.path.join(self.devices, self.existing_device,
                                    'objects')
        os.mkdir(self.objects)
        self._hash = utils.hash_path('a/c/o')
        digest = binascii.unhexlify(self._hash)
        part = struct.unpack_from('>I', digest)[0] >> 24
        self.next_part = struct.unpack_from('>I', digest)[0] >> 23
        self.objdir = os.path.join(
            self.objects, str(part), self._hash[-3:], self._hash)
        os.makedirs(self.objdir)
        self.object_fname = "1278553064.00000.data"
        self.objname = os.path.join(self.objdir, self.object_fname)
        with open(self.objname, "wb") as dummy:
            dummy.write(b"Hello World!")
            write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})

        test_policies = [StoragePolicy(0, 'platin', True)]
        storage_policy._POLICIES = StoragePolicyCollection(test_policies)

        self.expected_dir = os.path.join(
            self.objects, str(self.next_part), self._hash[-3:], self._hash)
        self.expected_file = os.path.join(self.expected_dir, self.object_fname)
开发者ID:matthewoliver,项目名称:swift,代码行数:41,代码来源:test_relinker.py


示例15: TestObjectExpirer

class TestObjectExpirer(TestCase):
    maxDiff = None

    def setUp(self):
        global not_sleep

        self.old_loadapp = internal_client.loadapp
        self.old_sleep = internal_client.sleep

        internal_client.loadapp = lambda *a, **kw: None
        internal_client.sleep = not_sleep

        self.rcache = mkdtemp()
        self.logger = FakeLogger()

    def teardown(self):
        rmtree(self.rcache)
        internal_client.sleep = self.old_sleep
        internal_client.loadapp = self.loadapp

    def test_get_process_values_from_kwargs(self):
        x = expirer.ObjectExpirer({})
        vals = {"processes": 5, "process": 1}
        self.assertEqual((5, 1), x.get_process_values(vals))

    def test_get_process_values_from_config(self):
        vals = {"processes": 5, "process": 1}
        x = expirer.ObjectExpirer(vals)
        self.assertEqual((5, 1), x.get_process_values({}))

    def test_get_process_values_negative_process(self):
        vals = {"processes": 5, "process": -1}
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_get_process_values_negative_processes(self):
        vals = {"processes": -5, "process": 1}
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_get_process_values_process_greater_than_processes(self):
        vals = {"processes": 5, "process": 7}
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_init_concurrency_too_small(self):
        conf = {"concurrency": 0}
        self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
        conf = {"concurrency": -1}
        self.assertRaises(ValueError, expirer.ObjectExpirer, conf)

    def test_process_based_concurrency(self):
        class ObjectExpirer(expirer.ObjectExpirer):
            def __init__(self, conf):
                super(ObjectExpirer, self).__init__(conf)
                self.processes = 3
                self.deleted_objects = {}

            def delete_object(self, actual_obj, timestamp, container, obj):
                if container not in self.deleted_objects:
                    self.deleted_objects[container] = set()
                self.deleted_objects[container].add(obj)

        class InternalClient(object):
            def __init__(self, containers):
                self.containers = containers

            def get_account_info(self, *a, **kw):
                return len(self.containers.keys()), sum([len(self.containers[x]) for x in self.containers])

            def iter_containers(self, *a, **kw):
                return [{"name": x} for x in self.containers.keys()]

            def iter_objects(self, account, container):
                return [{"name": x} for x in self.containers[container]]

            def delete_container(*a, **kw):
                pass

        ukey = u"3"
        containers = {
            0: set("1-one 2-two 3-three".split()),
            1: set("2-two 3-three 4-four".split()),
            2: set("5-five 6-six".split()),
            ukey: set(u"7-seven\u2661".split()),
        }
        x = ObjectExpirer({})
        x.swift = InternalClient(containers)
#.........这里部分代码省略.........
开发者ID:AsherBond,项目名称:swift,代码行数:101,代码来源:test_expirer.py


示例16: TestCryptoWsgiContext

class TestCryptoWsgiContext(unittest.TestCase):
    def setUp(self):
        class FakeFilter(object):
            app = None
            crypto = Crypto({})

        self.fake_logger = FakeLogger()
        self.crypto_context = CryptoWSGIContext(
            FakeFilter(), 'object', self.fake_logger)

    def test_get_keys(self):
        # ok
        env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(fetch_crypto_keys(), keys)

        # only default required keys are checked
        subset_keys = {'object': fetch_crypto_keys()['object']}
        env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(subset_keys, keys)

        # only specified required keys are checked
        subset_keys = {'container': fetch_crypto_keys()['container']}
        env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
        keys = self.crypto_context.get_keys(env, required=['container'])
        self.assertDictEqual(subset_keys, keys)

        subset_keys = {'object': fetch_crypto_keys()['object'],
                       'container': fetch_crypto_keys()['container']}
        env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
        keys = self.crypto_context.get_keys(
            env, required=['object', 'container'])
        self.assertDictEqual(subset_keys, keys)

    def test_get_keys_with_crypto_meta(self):
        # verify that key_id from crypto_meta is passed to fetch_crypto_keys
        keys = fetch_crypto_keys()
        mock_fetch_crypto_keys = mock.MagicMock(return_value=keys)
        env = {CRYPTO_KEY_CALLBACK: mock_fetch_crypto_keys}
        key_id = {'secret_id': '123'}
        keys = self.crypto_context.get_keys(env, key_id=key_id)
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id={'secret_id': '123'})

        # but it's ok for there to be no crypto_meta
        keys = self.crypto_context.get_keys(env, key_id={})
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id={})
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id=None)

    def test_get_keys_missing_callback(self):
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys({})
        self.assertIn('500 Internal Error', cm.exception.status)
        self.assertIn('missing callback',
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn(b'Unable to retrieve encryption keys.',
                      cm.exception.body)

    def test_get_keys_callback_exception(self):
        def callback(*args, **kwargs):
            raise Exception('boom')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
        self.assertIn('500 Internal Error', cm.exception.status)
        self.assertIn('from callback: boom',
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn(b'Unable to retrieve encryption keys.',
                      cm.exception.body)

    def test_get_keys_missing_key_for_default_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys.pop('object')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
        self.assertIn('500 Internal Error', cm.exception.status)
        self.assertIn("Missing key for 'object'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn(b'Unable to retrieve encryption keys.',
                      cm.exception.body)

    def test_get_keys_missing_object_key_for_specified_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys.pop('object')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
                required=['object', 'container'])
        self.assertIn('500 Internal Error', cm.exception.status)
        self.assertIn("Missing key for 'object'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn(b'Unable to retrieve encryption keys.',
                      cm.exception.body)

    def test_get_keys_missing_container_key_for_specified_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
#.........这里部分代码省略.........
开发者ID:mahak,项目名称:swift,代码行数:101,代码来源:test_crypto_utils.py


示例17: TestCryptoWsgiContext

class TestCryptoWsgiContext(unittest.TestCase):
    def setUp(self):
        class FakeFilter(object):
            app = None
            crypto = Crypto({})

        self.fake_logger = FakeLogger()
        self.crypto_context = CryptoWSGIContext(
            FakeFilter(), 'object', self.fake_logger)

    def test_get_keys(self):
        # ok
        env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(fetch_crypto_keys(), keys)

        # only default required keys are checked
        subset_keys = {'object': fetch_crypto_keys()['object']}
        env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(subset_keys, keys)

        # only specified required keys are checked
        subset_keys = {'container': fetch_crypto_keys()['container']}
        env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
        keys = self.crypto_context.get_keys(env, required=['container'])
        self.assertDictEqual(subset_keys, keys)

        subset_keys = {'object': fetch_crypto_keys()['object'],
                       'container': fetch_crypto_keys()['container']}
        env = {CRYPTO_KEY_CALLBACK: lambda: subset_keys}
        keys = self.crypto_context.get_keys(
            env, required=['object', 'container'])
        self.assertDictEqual(subset_keys, keys)

    def test_get_keys_missing_callback(self):
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys({})
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn('missing callback',
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_get_keys_callback_exception(self):
        def callback():
            raise Exception('boom')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn('from callback: boom',
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_get_keys_missing_key_for_default_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys.pop('object')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda: bad_keys})
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn("Missing key for 'object'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_get_keys_missing_object_key_for_specified_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys.pop('object')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda: bad_keys},
                required=['object', 'container'])
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn("Missing key for 'object'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_get_keys_missing_container_key_for_specified_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys.pop('container')
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda: bad_keys},
                required=['object', 'container'])
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn("Missing key for 'container'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_bad_object_key_for_default_required_list(self):
        bad_keys = dict(fetch_crypto_keys())
        bad_keys['object'] = 'the minor key'
        with self.assertRaises(HTTPException) as cm:
            self.crypto_context.get_keys(
                {CRYPTO_KEY_CALLBACK: lambda: bad_keys})
        self.assertIn('500 Internal Error', cm.exception.message)
        self.assertIn("Bad key for 'object'",
                      self.fake_logger.get_lines_for_level('error')[0])
        self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)

    def test_bad_container_key_for_default_required_list(self):
#.........这里部分代码省略.........
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:101,代码来源:test_crypto_utils.py


示例18: test_sweep_logs

    def test_sweep_logs(self):
        asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
        prefix_dir = os.path.join(asyncdir, 'abc')
        mkpath(prefix_dir)

        for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
                     ('jkl', 456), ('mno', 567)]:
            ohash = hash_path('account', 'container', o)
            o_path = os.path.join(prefix_dir, ohash + '-' +
                                  normalize_timestamp(t))
            write_pickle({}, o_path)

        class MockObjectUpdater(object_updater.ObjectUpdater):
            def process_object_update(self, update_path, device, policy):
                os.unlink(update_path)
                self.stats.successes += 1
                self.stats.unlinks += 1

        logger = FakeLogger()
        ou = MockObjectUpdater({
            'devices': self.devices_dir,
            'mount_check': 'false',
            'swift_dir': self.testdir,
            'interval': '1',
            'concurrency': '1',
            'report_interval': '10.0',
            'node_timeout': '5'}, logger=logger)

        now = [time()]

        def mock_time_function():
            rv = now[0]
            now[0] += 5
            return rv

        # With 10s between updates, time() advancing 5s every time we look,
        # and 5 async_pendings on disk, we should get at least two progress
        # lines.
        with mock.patch('swift.obj.updater.time',
                        mock.MagicMock(time=mock_time_function)), \
                mock.patch.object(object_updater, 'ContextPool', MockPool):
            ou.object_sweep(self.sda1)

        info_lines = logger.get_lines_for_level('info')
        self.assertEqual(4, len(info_lines))
        self.assertIn("sweep starting", info_lines[0])
        self.assertIn(self.sda1, info_lines[0])

        self.assertIn("sweep progress", info_lines[1])
        # the space ensures it's a positive number
        self.assertIn(
            "2 successes, 0 failures, 0 quarantines, 2 unlinks, 0 errors, "
            "0 redirects",
            info_lines[1])
        self.assertIn(self.sda1, info_lines[1])

        self.assertIn("sweep progress", info_lines[2])
        self.assertIn(
            "4 successes, 0 failures, 0 quarantines, 4 unlinks, 0 errors, "
            "0 redirects",
            info_lines[2])
        self.assertIn(self.sda1, info_lines[2])

        self.assertIn("sweep complete", info_lines[3])
        self.assertIn(
            "5 successes, 0 failures, 0 quarantines, 5 unlinks, 0 errors, "
            "0 redirects",
            info_lines[3])
        self.assertIn(self.sda1, info_lines[3])
开发者ID:jgmerritt,项目名称:swift,代码行数:69,代码来源:test_updater.py


示例19: TestObjectExpirer

class TestObjectExpirer(TestCase):
    maxDiff = None

    def setUp(self):
        global not_sleep

        self.old_loadapp = internal_client.loadapp
        self.old_sleep = internal_client.sleep

        internal_client.loadapp = lambda *a, **kw: None
        internal_client.sleep = not_sleep

        self.rcache = mkdtemp()
        self.logger = FakeLogger()

    def teardown(self):
        rmtree(self.rcache)
        internal_client.sleep = self.old_sleep
        internal_client.loadapp = self.loadapp

    def test_get_process_values_from_kwargs(self):
        x = expirer.ObjectExpirer({})
        vals = {
            'processes': 5,
            'process': 1,
        }
        self.assertEqual((5, 1), x.get_process_values(vals))

    def test_get_process_values_from_config(self):
        vals = {
            'processes': 5,
            'process': 1,
        }
        x = expirer.ObjectExpirer(vals)
        self.assertEqual((5, 1), x.get_process_values({}))

    def test_get_process_values_negative_process(self):
        vals = {
            'processes': 5,
            'process': -1,
        }
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_get_process_values_negative_processes(self):
        vals = {
            'processes': -5,
            'process': 1,
        }
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_get_process_values_process_greater_than_processes(self):
        vals = {
            'processes': 5,
            'process': 7,
        }
        # from config
        x = expirer.ObjectExpirer(vals)
        self.assertRaises(ValueError, x.get_process_values, {})
        # from kwargs
        x = expirer.ObjectExpirer({})
        self.assertRaises(ValueError, x.get_process_values, vals)

    def test_init_concurrency_too_small(self):
        conf = {
            'concurrency': 0,
        }
        self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
        conf = {
            'concurrency': -1,
        }
        self.assertRaises(ValueError, expirer.ObjectExpirer, conf)

    def test_process_based_concurrency(self):

        class ObjectExpirer(expirer.ObjectExpirer):

            def __init__(self, conf):
                super(ObjectExpirer, self).__init__(conf)
                self.processes = 3
                self.deleted_objects = {}

            def delete_object(self, actual_obj, timestamp, container, obj):
                if container not in self.deleted_objects:
                    self.deleted_objects[container] = set()
                self.deleted_objects[container].add(obj)

        class InternalClient(object):

            def __init__(self, containers):
                self.containers = containers
#.........这里部分代码省略.........
开发者ID:701,项目名称:swift,代码行数:101,代码来源:test_expirer.py


示例20: TestRelinker

class TestRelinker(unittest.TestCase):
    def setUp(self):
        skip_if_no_xattrs()
        self.logger = FakeLogger()
        self.testdir = tempfile.mkdtemp()
        self.devices = os.path.join(self.testdir, 'node')
        shutil.rmtree(self.testdir, ignore_errors=1)
        os.mkdir(self.testdir)
        os.mkdir(self.devices)

        self.rb = ring.RingBuilder(8, 6.0, 1)

        for i in range(6):
            ip = "127.0.0.%s" % i
            self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
                             'ip': ip, 'port': 10000, 'device': 'sda1'})
        self.rb.rebalance(seed=1)

        self.existing_device = 'sda1'
        os.mkdir(os.path.join(self.devices, self.existing_device))
        self.objects = os.path.join(self.devices, self.existing_device,
                                    'objects')
        os.mkdir(self.objects)
        self._hash = utils.hash_path('a/c/o')
        digest = binascii.unhexlify(self._hash)
        part = struct.unpack_from('>I', digest)[0] >> 24
        self.next_part = struct.unpack_from('>I', digest)[0] >> 23
        self.objdir = os.path.join(
            self.objects, str(part), self._hash[-3:], self._hash)
        os.makedirs(self.objdir)
        self.object_fname = "1278553064.00000.data"
        self.objname = os.path.join(self.objdir, self.object_fname)
        with open(self.objname, "wb") as dummy:
            dummy.write(b"Hello World!")
            write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})

        test_policies = [StoragePolicy(0, 'platin', True)]
        storage_policy._POLICIES = StoragePolicyCollection(test_policies)

        self.expected_dir = os.path.join(
            self.objects, str(self.next_part), self._hash[-3:], self._hash)
        self.expected_file = os.path.join(self.expected_dir, self.object_fname)

    def _save_ring(self):
        rd = self.rb.get_ring()
        for policy in POLICIES:
            rd.save(os.path.join(
                self.testdir, '%s.ring.gz' % policy.ring_name))
            # Enforce ring reloading in relinker
            policy.object_ring = None

    def tearDown(self):
        shutil.rmtree(self.testdir, ignore_errors=1)
        storage_policy.reload_storage_policies()

    def test_relink(self):
        self.rb.prepare_increase_partition_power()
        self._save_ring()
        relinker.relink(self.testdir, self.devices, True)

        self.assertTrue(os.path.isdir(self.expected_dir))
        self.assertTrue(os.path.isfile(self.expected_file))

        stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
        stat_new = os.stat(self.expected_file)
        self.assertEqual(stat_old.st_ino, stat_new.st_ino)

    def _common_test_cleanup(self, relink=True):
        # Create a ring that has prev_part_power set
        self.rb.prepare_increase_partition_power()
        self.rb.increase_partition_power()
        self._save_ring()

        os.makedirs(self.expected_dir)

        if relink:
            # Create a hardlink to the original object name. This is expected
            # after a normal relinker run
            os.link(os.path.join(self.objdir, self.object_fname),
                    self.expected_file)

    def test_cleanup(self):
        self._common_test_cleanup()
        self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))

        # Old objectname should be removed, new should still exist
        self.assertTrue(os.path.isdir(self.expected_dir))
        self.assertTrue(os.path.isfile(self.expected_file))
        self.assertFalse(os.path.isfile(
            os.path.join(self.objdir, self.object_fname)))

    def test_cleanup_not_yet_relinked(self):
        self._common_test_cleanup(relink=False)
        self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))

        self.assertTrue(os.path.isfile(
            os.path.join(self.objdir, self.object_fname)))

    def test_cleanup_deleted(self):
        self._common_test_cleanup()
#.........这里部分代码省略.........
开发者ID:matthewoliver,项目名称:swift,代码行数:101,代码来源:test_relinker.py



注:本文中的test.unit.FakeLogger类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python crypto_helpers.fetch_crypto_keys函数代码示例发布时间:2022-05-27
下一篇:
Python unit.temptree函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap