• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.readconf函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.common.utils.readconf函数的典型用法代码示例。如果您正苦于以下问题:Python readconf函数的具体用法?Python readconf怎么用?Python readconf使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了readconf函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: has_expirer_section

 def has_expirer_section(conf_path):
     try:
         readconf(conf_path, section_name="object-expirer")
     except ValueError:
         return False
     else:
         return True
开发者ID:mahak,项目名称:swift,代码行数:7,代码来源:manager.py


示例2: test_readconf

    def test_readconf(self):
        conf = """[section1]
foo = bar

[section2]
log_name = yarr"""
        # setup a real file
        with open("/tmp/test", "wb") as f:
            f.write(conf)
        make_filename = lambda: "/tmp/test"
        # setup a file stream
        make_fp = lambda: StringIO(conf)
        for conf_object_maker in (make_filename, make_fp):
            result = utils.readconf(conf_object_maker())
            expected = {"log_name": None, "section1": {"foo": "bar"}, "section2": {"log_name": "yarr"}}
            self.assertEquals(result, expected)
            result = utils.readconf(conf_object_maker(), "section1")
            expected = {"log_name": "section1", "foo": "bar"}
            self.assertEquals(result, expected)
            result = utils.readconf(conf_object_maker(), "section2").get("log_name")
            expected = "yarr"
            self.assertEquals(result, expected)
            result = utils.readconf(conf_object_maker(), "section1", log_name="foo").get("log_name")
            expected = "foo"
            self.assertEquals(result, expected)
            result = utils.readconf(conf_object_maker(), "section1", defaults={"bar": "baz"})
            expected = {"log_name": "section1", "foo": "bar", "bar": "baz"}
            self.assertEquals(result, expected)
        self.assertRaises(SystemExit, utils.readconf, "/tmp/test", "section3")
        os.unlink("/tmp/test")
        self.assertRaises(SystemExit, utils.readconf, "/tmp/test")
开发者ID:colecrawford,项目名称:swift,代码行数:31,代码来源:test_utils.py


示例3: test_readconf

    def test_readconf(self):
        conf = '''[section1]
foo = bar

[section2]
log_name = yarr'''
        f = open('/tmp/test', 'wb')
        f.write(conf)
        f.close()
        result = utils.readconf('/tmp/test')
        expected = {'log_name': None,
                    'section1': {'foo': 'bar'},
                    'section2': {'log_name': 'yarr'}}
        self.assertEquals(result, expected)
        result = utils.readconf('/tmp/test', 'section1')
        expected = {'log_name': 'section1', 'foo': 'bar'}
        self.assertEquals(result, expected)
        result = utils.readconf('/tmp/test', 'section2').get('log_name')
        expected = 'yarr'
        self.assertEquals(result, expected)
        result = utils.readconf('/tmp/test', 'section1',
                                log_name='foo').get('log_name')
        expected = 'foo'
        self.assertEquals(result, expected)
        result = utils.readconf('/tmp/test', 'section1',
                                defaults={'bar': 'baz'})
        expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
        self.assertEquals(result, expected)
        os.unlink('/tmp/test')
开发者ID:edwardt,项目名称:swift,代码行数:29,代码来源:test_utils.py


示例4: test_rsync_tempfile_timeout_auto_option

 def test_rsync_tempfile_timeout_auto_option(self):
     # if we don't have access to the replicator config section we'll use
     # our default
     auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
                                            self.rcache, self.devices)
     self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
     # if the rsync_tempfile_timeout option is set explicitly we use that
     self.conf['rsync_tempfile_timeout'] = '1800'
     auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
                                            self.rcache, self.devices)
     self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
     # if we have a real config we can be a little smarter
     config_path = os.path.join(self.testdir, 'objserver.conf')
     stub_config = """
     [object-auditor]
     rsync_tempfile_timeout = auto
     """
     with open(config_path, 'w') as f:
         f.write(textwrap.dedent(stub_config))
     # the Daemon loader will hand the object-auditor config to the
     # auditor who will build the workers from it
     conf = readconf(config_path, 'object-auditor')
     auditor_worker = auditor.AuditorWorker(conf, self.logger,
                                            self.rcache, self.devices)
     # if there is no object-replicator section we still have to fall back
     # to default because we can't parse the config for that section!
     self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
     stub_config = """
     [object-replicator]
     [object-auditor]
     rsync_tempfile_timeout = auto
     """
     with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
         f.write(textwrap.dedent(stub_config))
     conf = readconf(config_path, 'object-auditor')
     auditor_worker = auditor.AuditorWorker(conf, self.logger,
                                            self.rcache, self.devices)
     # if the object-replicator section will parse but does not override
     # the default rsync_timeout we assume the default rsync_timeout value
     # and add 15mins
     self.assertEqual(auditor_worker.rsync_tempfile_timeout,
                      replicator.DEFAULT_RSYNC_TIMEOUT + 900)
     stub_config = """
     [DEFAULT]
     reclaim_age = 1209600
     [object-replicator]
     rsync_timeout = 3600
     [object-auditor]
     rsync_tempfile_timeout = auto
     """
     with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
         f.write(textwrap.dedent(stub_config))
     conf = readconf(config_path, 'object-auditor')
     auditor_worker = auditor.AuditorWorker(conf, self.logger,
                                            self.rcache, self.devices)
     # if there is an object-replicator section with a rsync_timeout
     # configured we'll use that value (3600) + 900
     self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
开发者ID:Ahiknsr,项目名称:swift,代码行数:58,代码来源:test_auditor.py


示例5: get_config

def get_config():
    """
    Attempt to get a functional config dictionary.
    """
    config_file = 'test/dispatcher_test.conf'
    config = {}
    try:
        try:
            config = readconf(config_file, 'app:dispatcher')
        except MissingSectionHeaderError:
            config_fp = StringIO('[func_test]\n' + open(config_file).read())
            config = readconf(config_fp, 'func_test')
    except SystemExit:
        print >>sys.stderr, 'UNABLE TO READ FUNCTIONAL TESTS CONFIG FILE'
    return config
开发者ID:AsherBond,项目名称:colony,代码行数:15,代码来源:__init__.py


示例6: _get_objects_dir

 def _get_objects_dir(self, onode):
     device = onode["device"]
     node_id = (onode["port"] - 6000) / 10
     obj_server_conf = readconf(self.configs["object-server"][node_id])
     devices = obj_server_conf["app:object-server"]["devices"]
     obj_dir = "%s/%s" % (devices, device)
     return obj_dir
开发者ID:portante,项目名称:swift,代码行数:7,代码来源:test_empty_device_handoff.py


示例7: setUp

 def setUp(self):
     super(TestDbRsyncReplicator, self).setUp()
     cont_configs = [utils.readconf(p, 'container-replicator')
                     for p in self.configs['container-replicator'].values()]
     # Do more than per_diff object PUTs, to force rsync instead of usync
     self.object_puts = 1 + max(int(c.get('per_diff', '1000'))
                                for c in cont_configs)
开发者ID:jgmerritt,项目名称:swift,代码行数:7,代码来源:test_db_replicator.py


示例8: _get_objects_dir

 def _get_objects_dir(self, onode):
     device = onode["device"]
     _, node_id = get_server_number((onode["ip"], onode["port"]), self.ipport2server)
     obj_server_conf = readconf(self.configs["object-server"][node_id])
     devices = obj_server_conf["app:object-server"]["devices"]
     obj_dir = "%s/%s" % (devices, device)
     return obj_dir
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:7,代码来源:test_empty_device_handoff.py


示例9: _get_objects_dir

 def _get_objects_dir(self, onode):
     device = onode['device']
     node_id = (onode['port'] - 6000) / 10
     obj_server_conf = readconf(self.configs['object'] % node_id)
     devices = obj_server_conf['app:object-server']['devices']
     obj_dir = '%s/%s' % (devices, device)
     return obj_dir
开发者ID:BlueSkyChina,项目名称:swift,代码行数:7,代码来源:test_empty_device_handoff.py


示例10: __init__

    def __init__(self, app, conf):
        self.app = app

        keymaster_config_path = conf.get('keymaster_config_path')
        if keymaster_config_path:
            if any(opt in conf for opt in ('encryption_root_secret',)):
                raise ValueError('keymaster_config_path is set, but there '
                                 'are other config options specified!')
            conf = readconf(keymaster_config_path, 'keymaster')

        self.root_secret = conf.get('encryption_root_secret')
        try:
            # b64decode will silently discard bad characters, but we should
            # treat them as an error
            if not isinstance(self.root_secret, six.string_types) or any(
                    c not in string.digits + string.ascii_letters + '/+\r\n'
                    for c in self.root_secret.strip('\r\n=')):
                raise ValueError
            self.root_secret = base64.b64decode(self.root_secret)
            if len(self.root_secret) < 32:
                raise ValueError
        except (TypeError, ValueError):
            raise ValueError(
                'encryption_root_secret option in %s must be a base64 '
                'encoding of at least 32 raw bytes' % (
                    keymaster_config_path or 'proxy-server.conf'))
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:26,代码来源:keymaster.py


示例11: load_server_conf

def load_server_conf(conf, sections):
    server_conf_file = conf.get('__file__', None)
    if server_conf_file:
        server_conf = readconf(server_conf_file)
        for sect in sections:
            if server_conf.get(sect, None):
                conf.update(server_conf[sect])
开发者ID:Prosunjit,项目名称:zerocloud,代码行数:7,代码来源:common.py


示例12: setUp

    def setUp(self):
        resetswift()
        try:
            self.ipport2server = {}
            self.configs = defaultdict(dict)
            self.account_ring = get_ring(
                'account',
                self.acct_cont_required_replicas,
                self.acct_cont_required_devices,
                ipport2server=self.ipport2server,
                config_paths=self.configs)
            self.container_ring = get_ring(
                'container',
                self.acct_cont_required_replicas,
                self.acct_cont_required_devices,
                ipport2server=self.ipport2server,
                config_paths=self.configs)
            self.policy = get_policy(**self.policy_requirements)
            self.object_ring = get_ring(
                self.policy.ring_name,
                self.obj_required_replicas,
                self.obj_required_devices,
                server='object',
                ipport2server=self.ipport2server,
                config_paths=self.configs)

            self.servers_per_port = any(
                int(readconf(c, section_name='object-replicator').get(
                    'servers_per_port', '0'))
                for c in self.configs['object-replicator'].values())

            Manager(['main']).start(wait=False)
            for ipport in self.ipport2server:
                check_server(ipport, self.ipport2server)
            proxy_ipport = ('127.0.0.1', 8080)
            self.ipport2server[proxy_ipport] = 'proxy'
            self.url, self.token, self.account = check_server(
                proxy_ipport, self.ipport2server)
            self.account_1 = {
                'url': self.url, 'token': self.token, 'account': self.account}

            url2, token2 = get_auth(
                'http://%s:%d/auth/v1.0' % proxy_ipport,
                'test2:tester2', 'testing2')
            self.account_2 = {
                'url': url2, 'token': token2, 'account': url2.split('/')[-1]}
            head_account(url2, token2)  # sanity check

            self.replicators = Manager(
                ['account-replicator', 'container-replicator',
                 'object-replicator'])
            self.updaters = Manager(['container-updater', 'object-updater'])
        except BaseException:
            try:
                raise
            finally:
                try:
                    Manager(['all']).kill()
                except Exception:
                    pass
开发者ID:prashanthpai,项目名称:swift,代码行数:60,代码来源:common.py


示例13: device_dir

 def device_dir(self, server, node):
     server_type, config_number = get_server_number(
         (node['ip'], node['port']), self.ipport2server)
     repl_server = '%s-replicator' % server_type
     conf = readconf(self.configs[repl_server][config_number],
                     section_name=repl_server)
     return os.path.join(conf['devices'], node['device'])
开发者ID:Ahiknsr,项目名称:swift,代码行数:7,代码来源:common.py


示例14: _get_root_secret

    def _get_root_secret(self, conf):
        """
        This keymaster requires its ``encryption_root_secret`` option to be
        set. This must be set before first use to a value that is a base64
        encoding of at least 32 bytes. The encryption root secret is stored
        in either proxy-server.conf, or in an external file referenced from
        proxy-server.conf using ``keymaster_config_path``.

        :param conf: the keymaster config section from proxy-server.conf
        :type conf: dict

        :return: the encryption root secret binary bytes
        :rtype: bytearray
        """
        if self.keymaster_config_path:
            keymaster_opts = ['encryption_root_secret']
            if any(opt in conf for opt in keymaster_opts):
                raise ValueError('keymaster_config_path is set, but there '
                                 'are other config options specified: %s' %
                                 ", ".join(list(
                                     set(keymaster_opts).intersection(conf))))
            conf = readconf(self.keymaster_config_path, 'keymaster')
        b64_root_secret = conf.get('encryption_root_secret')
        try:
            binary_root_secret = strict_b64decode(b64_root_secret,
                                                  allow_line_breaks=True)
            if len(binary_root_secret) < 32:
                raise ValueError
            return binary_root_secret
        except ValueError:
            raise ValueError(
                'encryption_root_secret option in %s must be a base64 '
                'encoding of at least 32 raw bytes' % (
                    self.keymaster_config_path or 'proxy-server.conf'))
开发者ID:chenzhongtao,项目名称:swift,代码行数:34,代码来源:keymaster.py


示例15: get_lfs

def get_lfs(conf, ring, datadir, default_port, logger):
    """
    Returns LFS for current node

    :param conf: server configuration
    :param ring: server ring file
    :param datadir: server data directory
    :param default_port: default server port
    :param logger: server logger
    :returns : LFS storage class
    :raises SwiftConfigurationError: if fs is invalid
    """
    fs = conf.get('fs', 'xfs')
    try:
        module_name = 'swift_lfs.fs.%s' % fs
        cls_name = 'LFS%s' % fs.upper()
        module = __import__(module_name, fromlist=[cls_name])
        cls = getattr(module, cls_name)
        if '__file__' in conf and fs in conf:
            fs_conf = readconf(conf['__file__'], fs)
            conf = dict(conf, **fs_conf)
        return cls(conf, ring, datadir, default_port, logger)
    except ImportError, e:
        raise SwiftConfigurationError(
            _('Cannot load LFS. Invalid FS: %s. %s') % (fs, e))
开发者ID:Nexenta,项目名称:lfs,代码行数:25,代码来源:__init__.py


示例16: get_config

def get_config():
    """
    Attempt to get a functional config dictionary.
    """
    config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE',
                                 '/etc/gluster-object/func_test.conf')
    config = {}
    try:
        try:
            config = readconf(config_file, 'func_test')
        except MissingSectionHeaderError:
            config_fp = StringIO('[func_test]\n' + open(config_file).read())
            config = readconf(config_fp, 'func_test')
    except SystemExit:
        print >>sys.stderr, 'UNABLE TO READ FUNCTIONAL TESTS CONFIG FILE'
    return config
开发者ID:Gaurav-Gangalwar,项目名称:UFO,代码行数:16,代码来源:__init__.py


示例17: get_config

def get_config(section_name=None, defaults=None):
    """
    Attempt to get a test config dictionary.

    :param section_name: the section to read (all sections if not defined)
    :param defaults: an optional dictionary namespace of defaults
    """
    config = {}
    if defaults is not None:
        config.update(defaults)

    config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE',
                                 '/etc/swift/test.conf')
    try:
        config = readconf(config_file, section_name)
    except IOError:
        if not os.path.exists(config_file):
            print('Unable to read test config %s - file not found'
                  % config_file, file=sys.stderr)
        elif not os.access(config_file, os.R_OK):
            print('Unable to read test config %s - permission denied'
                  % config_file, file=sys.stderr)
    except ValueError as e:
        print(e)
    return config
开发者ID:bebule,项目名称:swift,代码行数:25,代码来源:__init__.py


示例18: run_daemon

def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
    """
    Loads settings from conf, then instantiates daemon "klass" and runs the
    daemon with the specified once kwarg.  The section_name will be derived
    from the daemon "klass" if not provided (e.g. ObjectReplicator =>
    object-replicator).

    :param klass: Class to instantiate, subclass of common.daemon.Daemon
    :param conf_file: Path to configuration file
    :param section_name: Section name from conf file to load config from
    :param once: Passed to daemon run method
    """
    # very often the config section_name is based on the class name
    # the None singleton will be passed through to readconf as is
    if section_name is '':
        section_name = sub(r'([a-z])([A-Z])', r'\1-\2',
                           klass.__name__).lower()
    conf = utils.readconf(conf_file, section_name,
                          log_name=kwargs.get('log_name'))

    # once on command line (i.e. daemonize=false) will over-ride config
    once = once or \
            conf.get('daemonize', 'true').lower() not in utils.TRUE_VALUES

    # pre-configure logger
    if 'logger' in kwargs:
        logger = kwargs.pop('logger')
    else:
        logger = utils.get_logger(conf, conf.get('log_name', section_name),
           log_to_console=kwargs.pop('verbose', False), log_route=section_name)
    try:
        klass(conf).run(once=once, **kwargs)
    except KeyboardInterrupt:
        logger.info('User quit')
    logger.info('Exited')
开发者ID:BillTheBest,项目名称:swift,代码行数:35,代码来源:daemon.py


示例19: collect_tasks

 def collect_tasks(self):
     tasks = {"account": {}, "container": {}, "object": {}}
     for t in tasks:
         for level in ("partition", "suffix", "hsh", "item"):
             tasks[t][level] = []
     for section in self.conf:
         try:
             if type(self.conf[section]) is not dict:
                 continue
             if "data_type" in self.conf[section] and self.conf[section]:
                 datatype = self.conf[section]["data_type"]
                 level = self.conf[section]["level"]
                 task_module = self.conf[section]["task_module"]
                 task_class = self.conf[section]["task_class"]
                 task_conf = self.conf[section]["task_conf"]
                 task_method = self.conf[section].get("task_method", "process_%s" % level)
                 _module = __import__(task_module)
                 splits = task_module.split(".")
                 for split in splits[1:]:
                     _module = getattr(_module, split)
                 _class = getattr(_module, task_class)
                 conf = utils.readconf(task_conf, section)
                 _object = _class(conf)
                 self.conf[section]["func"] = getattr(_object, task_method)
                 self.conf[section]["cycles"] = int(self.conf[section].get("cycles", 1))
                 tasks[datatype][level].append(section)
         except:
             print "Failed to parse config file section %s - %s" % (section, sys.exc_info())
             traceback.print_exc()
     return tasks
开发者ID:shaolinjr,项目名称:SwiftCrawler,代码行数:30,代码来源:crawl.py


示例20: test_locked_container_dbs

    def test_locked_container_dbs(self):

        def run_test(num_locks, catch_503):
            container = 'container-%s' % uuid4()
            client.put_container(self.url, self.token, container)
            db_files = self._get_container_db_files(container)
            db_conns = []
            for i in range(num_locks):
                db_conn = connect(db_files[i])
                db_conn.execute('begin exclusive transaction')
                db_conns.append(db_conn)
            if catch_503:
                try:
                    client.delete_container(self.url, self.token, container)
                except client.ClientException as err:
                    self.assertEqual(err.http_status, 503)
                else:
                    self.fail("Expected ClientException but didn't get it")
            else:
                client.delete_container(self.url, self.token, container)

        proxy_conf = readconf(self.configs['proxy-server'],
                              section_name='app:proxy-server')
        node_timeout = int(proxy_conf.get('node_timeout', 10))
        pool = GreenPool()
        try:
            with Timeout(node_timeout + 5):
                pool.spawn(run_test, 1, False)
                pool.spawn(run_test, 2, True)
                pool.spawn(run_test, 3, True)
                pool.waitall()
        except Timeout as err:
            raise Exception(
                "The server did not return a 503 on container db locks, "
                "it just hangs: %s" % err)
开发者ID:mahak,项目名称:swift,代码行数:35,代码来源:test_container_failures.py



注:本文中的swift.common.utils.readconf函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.register_swift_info函数代码示例发布时间:2022-05-27
下一篇:
Python utils.read_conf_dir函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap