• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python yaml.load_all函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中yaml.load_all函数的典型用法代码示例。如果您正苦于以下问题:Python load_all函数的具体用法?Python load_all怎么用?Python load_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了load_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_systemdir

def parse_systemdir(systemdir):
    """
    """
    files = set(iter(filter(lambda f: f.endswith('.yaml'), os.listdir(systemdir))))
    systemfiles = list()
    # parse default system file
    if 'system.yaml' in files:
        files.remove('system.yaml')
        path = os.path.join(systemdir, 'system.yaml')
        with open(path, 'r') as f:
            streams = list(yaml.load_all(f))
            if len(streams) > 0:
                for i in range(len(streams)):
                    systemfiles.append(SystemFile(path, i, streams[i]))
            else:
                logger.warning("no systems found in system.yaml")
    else:
        logger.info("no system.yaml system file was found")
    # parse all remaining system files
    for systemfile in files:
        path = os.path.join(systemdir, systemfile)
        with open(path, 'r') as f:
            streams = list(yaml.load_all(f))
            if len(streams) > 0:
                for i in range(len(streams)):
                    systemfiles.append(SystemFile(path, i, streams[i]))
            else:
                logger.warning("no systems found in %s", systemfile)
    return systemfiles
开发者ID:mikefort,项目名称:mandelbrot,代码行数:29,代码来源:systemfile.py


示例2: __init__

    def __init__(self, args):
        # Load ballot records from yaml file
        self.input = args[0]
        self.precs = 0
        try:
            stream = open(self.input + '.yaml', 'r')
        except:
            print('Unable to open ' + self.input + '\n')
            exit(0)
        else:
            a = audit_header.AuditHeader()
            a.load_from_file(stream)
            self.b = list(yaml.load_all(stream))

        # If a jurisdiction_slate filename was given, then load its
        #  contents
        if len(args) == 2:
            try:
                stream = open(args[1] + '.yaml', 'r')
            except:
                print('Unable to open ' + self.input + '\n')
                exit(0)
            else:
                a = audit_header.AuditHeader()
                a.load_from_file(stream)
                self.j = yaml.load_all(stream)
        else:
            self.j = False

        # Add the vote counts of candidates with the same ID# using
        #  sumation(). Write the vote totals for each candidate to the
        #  report stream.
        self.serialize_csv(self.sumation())
开发者ID:damicof,项目名称:tabulator,代码行数:33,代码来源:tabulator.py


示例3: __init__

 def __init__(self, fn=None, content=None):
     IDocumentFormatter.__init__(self)
     if fn is not None:
         with file(fn, "r") as f:
             self.__content = load_all(f, Loader=Loader)
     else:
         self.__content = load_all(content, Loader=Loader)
开发者ID:reyoung,项目名称:SlideGen2,代码行数:7,代码来源:yaml_formatter.py


示例4: validate

    def validate(self, **kwargs):
        """
        Validates a submission file

        :param file_path: path to file to be loaded.
        :param data: pre loaded YAML object (optional).
        :return: Bool to indicate the validity of the file.
        """
        try:
            submission_file_schema = json.load(open(self.default_schema_file, 'r'))

            additional_file_section_schema = json.load(open(self.additional_info_schema, 'r'))

            # even though we are using the yaml package to load,
            # it supports JSON and YAML
            data = kwargs.pop("data", None)
            file_path = kwargs.pop("file_path", None)

            if file_path is None:
                raise LookupError("file_path argument must be supplied")

            if data is None:
                try:
                    # We try to load using the CLoader for speed improvements.
                    data = yaml.load_all(open(file_path, 'r'), Loader=yaml.CLoader)
                except Exception as e: #pragma: no cover
                    data = yaml.load_all(open(file_path, 'r')) #pragma: no cover

            for data_item in data:
                if data_item is None:
                    continue
                try:
                    if 'comment' in data_item:
                        validate(data_item, additional_file_section_schema)
                    else:
                        validate(data_item, submission_file_schema)

                except ValidationError as ve:
                    self.add_validation_message(
                            ValidationMessage(file=file_path,
                                                message=ve.message + ' in ' + str(ve.instance)))

            if self.has_errors(file_path):
                return False
            else:
                return True

        except ScannerError as se:  # pragma: no cover
            self.add_validation_message(  # pragma: no cover
                ValidationMessage(file=file_path, message=
                    'There was a problem parsing the file.  '
                    'This can be because you forgot spaces '
                    'after colons in your YAML file for instance.  '
                    'Diagnostic information follows.\n' + str(se)))
            return False
开发者ID:HEPData,项目名称:hepdata-validator,代码行数:55,代码来源:submission_file_validator.py


示例5: __init__

    def __init__(self, election, record1, record2,
                 merge_output):
        self.rstream = open(''.join([merge_output,'.log']), 'w')
        # Load ballot records from yaml file
        try:
            stream = open(''.join([record1,'.yml']), 'r')
        except IOError:
            self.rstream.write(''.join(['Unable to open ',record1,'\n']))
            print(''.join(['Unable to open ',record1,'\n']))
            raise
        else:
            a = audit_header.AuditHeader()
            a.load_from_file(stream)
            guid1 = a.file_id
            prov1 = a.provenance
            self.b1 = list(yaml.load_all(stream))
            stream.close()
        try:
            stream = open(''.join([record2,'.yml']), 'r')
        except IOError:
            self.rstream.write(''.join(['Unable to open ',record2,'\n']))
            print(''.join(['Unable to open ',record2,'\n']))
            raise
        else:
            a = audit_header.AuditHeader()
            a.load_from_file(stream)
            guid2 = a.file_id
            prov2 = a.provenance
            self.b2 = list(yaml.load_all(stream))
            stream.close()

        # Get the election specs from file
        try:
            stream = open(''.join([election,'.yml']), 'r')
        except IOError:
            self.rstream.write(''.join(['Unable to open ',record2,'\n']))
            print(''.join(['Unable to open ',record2,'\n']))
            raise
        else:
            a = audit_header.AuditHeader()
            a.load_from_file(stream)
            self.templ_type = a.type
            self.e = yaml.load(stream)
            stream.close()
            if self.e.has_key('precinct_list'):
                del self.e['precinct_list']

        # Combine provenances and guids from input files
        self.new_prov = []
        self.new_prov.extend(prov1)
        self.new_prov.extend(prov2)
        self.new_prov.append(guid1)
        self.new_prov.append(guid2)
开发者ID:trustthevote,项目名称:Tabulator-v1,代码行数:53,代码来源:merger.py


示例6: test_generating_simple_init

    def test_generating_simple_init(self, mock_conf):
        mock_conf.yml_dir_path = RC_PATH
        yml_path = _render_yml(self.init_variables, "generic-init.yml.j2")
        with open(yml_path, 'r') as stream:
            generated_files = yaml.load_all(stream.read())
        with open("./fake_templates/simple-init.yml", 'r') as stream:
            expected_files = yaml.load_all(stream.read())
        try:
            # compare pod
            self.assertEqual(expected_files.next(), generated_files.next())

        except StopIteration:
            self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:13,代码来源:test_service.py


示例7: __init__

	def __init__(self,repo):
		DataBase.__init__(self,"freecad",repo)
		self.bases = []

		self.base_classes = Links()
		self.collection_bases = Links()

		if not exists(self.backend_root):
			e = MalformedRepositoryError("freecad directory does not exist")
			e.set_repo_path(repo.path)
			raise e

		for coll in listdir(self.backend_root):
			basefilename = join(self.backend_root,coll,"%s.base" % coll)
			if not exists(basefilename):
				#skip directory that is no collection
				continue
			try:
				base_info =  list(yaml.load_all(open(basefilename,"r","utf8"), Loader=yaml.FullLoader))
				# FullLoader is not implemented in pyyaml < 5.1
			except AttributeError:
				# this is deprecated for newer pyyaml versions
				base_info =  list(yaml.load_all(open(basefilename,"r","utf8")))
			if len(base_info) != 1:
				raise MalformedCollectionError(
						"Not exactly one YAML document found in file %s" % basefilename)
			base_info = base_info[0]
			for basefile in base_info:
				if basefile["type"] == "function":
					basepath = join(self.backend_root,coll,basefile["filename"])
					if not exists(basepath):
						raise MalformedBaseError("Python module %s does not exist" % basepath)
					for func in basefile["functions"]:
						try:
							function = BaseFunction(func,basefile,coll,self.backend_root)
							self.bases.append(function)
							self.collection_bases.add_link(repo.collections[coll],function)
							for id in func["classids"]:
								if not id in repo.classes:
									raise MalformedBaseError(
										"Unknown class %s" % id)
								if self.base_classes.contains_dst(repo.classes[id]):
									raise NonUniqueBaseError(id)
								self.base_classes.add_link(function,repo.classes[id])
						except ParsingError as e:
							e.set_base(basefile["filename"])
							e.set_collection(coll)
							raise e
				else:
					raise MalformedBaseError("Unknown base type %s" % basefile["type"])
开发者ID:jreinhardt,项目名称:BOLTS,代码行数:50,代码来源:freecad.py


示例8: test_generating_without_ports

    def test_generating_without_ports(self, mock_conf):
        self.control_variables["ports"] = ['']
        mock_conf.yml_dir_path = RC_PATH
        yml_path = _render_yml(self.control_variables)
        with open(yml_path, 'r') as stream:
            generated_files = yaml.load_all(stream.read())
        with open("./fake_templates/no_ports.yml", 'r') as stream:
            expected_files = yaml.load_all(stream.read())
        try:
            # compare ReplicationController
            self.assertEqual(expected_files.next(), generated_files.next())

        except StopIteration:
            self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:14,代码来源:test_service.py


示例9: test_constructor

def test_constructor(data_filename, canonical_filename, verbose=False):
    _make_loader()
    _make_canonical_loader()
    native1 = None
    native2 = None
    try:
        native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
        native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
        assert native1 == native2, (native1, native2)
    finally:
        if verbose:
            print("NATIVE1:")
            pprint.pprint(native1)
            print("NATIVE2:")
            pprint.pprint(native2)
开发者ID:15580056814,项目名称:hue,代码行数:15,代码来源:test_structure.py


示例10: test_generating_with_emprtDir_path

    def test_generating_with_emprtDir_path(self, mock_conf):
        self.control_variables["host_path"] = None
        self.control_variables["container_path"] = "/foo/bar/container"
        mock_conf.yml_dir_path = RC_PATH
        yml_path = _render_yml(self.control_variables)
        with open(yml_path, 'r') as stream:
            generated_files = yaml.load_all(stream.read())
        with open("./fake_templates/emptyDir.yml", 'r') as stream:
            expected_files = yaml.load_all(stream.read())
        try:
            # compare ReplicationController
            self.assertEqual(expected_files.next(), generated_files.next())

        except StopIteration:
            self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:15,代码来源:test_service.py


示例11: load_djs

def load_djs(path, db):
    with open(os.path.join(path, "djs.yaml")) as f:
        for dj in yaml.load_all(f):
            _populate_slug_and_name(dj)
            _store_replacement(db["dj"], dj)
            db["dj"][dj["slug"]] = dj
            if "city" in dj:
                city = dj["city"]
                cityslug = slugify(city)
                if cityslug not in db["city"]:
                    db["city"][cityslug] = dict(name=city, slug=cityslug, lint_created_by_dj=dj["slug"])
                dj["city"] = cityslug
                city_djs = db["city"][cityslug].setdefault("djs", [])
                city_djs.append(dj["slug"])
            # Normalize 'genre' to 'genres'.
            genres = dj.setdefault("genres", [])
            if "genre" in dj:
                genres.append(dj.pop("genre"))
            genreslugs = []
            for genre in genres:
                genreslug = slugify(genre)
                if genreslug not in db["genre"]:
                    db["genre"][genreslug] = dict(name=genre, slug=genreslug, lint_created_by_dj=dj["slug"])
                genreslugs.append(genreslug)
                genredjs = db["genre"][genreslug].setdefault("djs", [])
                genredjs.append(dj["slug"])
            dj["genres"] = genreslugs
开发者ID:gldnspud,项目名称:fosmc,代码行数:27,代码来源:db.py


示例12: read_yaml

def read_yaml(yaml_filename):
    docs=[]
    with open(yaml_filename) as f:
        docs=yaml.load_all(f)
        docs=list(docs)
#        docs=yaml.dump_all(ya, encoding='utf8', allow_unicode=True)
    return docs
开发者ID:buzmakov,项目名称:fastlib,代码行数:7,代码来源:yamlutils.py


示例13: read_data

    def read_data(self, datafile):
        with open(datafile, "rt") as f:
            parsed = yaml.load_all(f)
            layout = dict2obj(parsed.next())
            meta = dict2obj(parsed.next())
            content = dict2obj(parsed.next())

        if layout.section != 'layout':
            exit('layout document in "' + datafile + '" is malformed.')
        elif meta.section != 'meta':
            exit('meta document in "' + datafile + '" is malformed.')
        elif content.section != 'content':
            exit('content document in "' + datafile + '" is malformed.')

        rows = { 'rows': [] }

        if layout.header:
            header = []
            for cell in layout.header:
                header.append([eval(cell)])
        else:
            header = None

        for rownum in layout.rows:
            parsed_cell = []
            for cell in rownum.items()[0][1]:
                parsed_cell.append(eval(cell))
            rows['rows'].append( dict(zip(rownum.keys(), [parsed_cell])) )

        # return header, rows
        self.header = header
        self.rows = rows
开发者ID:KensoDev,项目名称:docs,代码行数:32,代码来源:table_builder.py


示例14: construct_case

def construct_case(filename, name):
    """
    Parse a definition of a test case from a yaml file and construct the
    TestCase subclass dynamically.
    """
    def make_test(test_name, definition, i):
        def m(self):
            if name in SKIP_TESTS.get(self.es_version, ()):
                raise SkipTest()
            self.run_code(definition)
        m.__doc__ = '%s:%s.test_from_yaml_%d (%s): %s' % (
            __name__, name, i, '/'.join(filename.split('/')[-2:]), test_name)
        m.__name__ = 'test_from_yaml_%d' % i
        return m

    with open(filename) as f:
        tests = list(yaml.load_all(f))

    attrs = {
        '_yaml_file': filename
    }
    i = 0
    for test in tests:
        for test_name, definition in test.items():
            if test_name == 'setup':
                attrs['_setup_code'] = definition
                continue

            attrs['test_from_yaml_%d' % i] = make_test(test_name, definition, i)
            i += 1

    return type(name, (YamlTestCase, ), attrs)
开发者ID:davinirjr,项目名称:elasticsearch-py,代码行数:32,代码来源:test_common.py


示例15: update_all

    def update_all(self, form, force_rollback=False):        
        form = self._quote_element_names(form)
        form_dict_list = yaml.load_all(form)

        self.session.begin()
            
        is_modified = False
        for form_dict in form_dict_list:
            element_name = form_dict['element_name']
            ename = ObjectSpec.parse(element_name, expected=ElementName)
                        
            # update_all() does not look in the instance_cache, must exist in DB
            instance = self.session.resolve_element_spec(ename)
            self._update_instance(instance, form_dict)
            is_modified = is_modified or self.session.is_modified(instance)
        
        desc = self.session.create_change_description()

        if force_rollback:
            self.log.info("Forced Rollback")
            self.session.rollback()
                    
        elif is_modified:
            self.log.fine("Instance Modified: Commit")                                            
            self.session.commit()
            
        else:
            self.log.fine("Instance Unchanged: Rollback")                                            
            self.session.rollback()
        
        return desc
开发者ID:nveeser,项目名称:dino,代码行数:31,代码来源:element_form.py


示例16: parse_post

def parse_post(f):
    parsed = yaml.load_all(f)
    post_header = next(parsed)
    if 'postready' in post_header and (not post_header['postready']):
        print("Post not ready")
        return None
    f.seek(0)
    l = f.readline()
    while (l != '...\n') and (l != ''):
        l = f.readline()
    if l == '':
        return None
    comment = f.read()
    post_header['comment'] = comment
    imgs = []
    if 'image1' in post_header and post_header['image1']:
        with open(post_header['image1'], 'rb') as f:
            imgs += [f.read()]
    if 'image2' in post_header and post_header['image2']:
        with open(post_header['image2'], 'rb') as f:
            imgs += [f.read()]
    if 'image3' in post_header and post_header['image3']:
        with open(post_header['image3'], 'rb') as f:
            imgs += [f.read()]
    if 'image4' in post_header and post_header['image4']:
        with open(post_header['image4'], 'rb') as f:
            imgs += [f.read()]
    post_header['imgs'] = imgs
    return post_header
开发者ID:gordon-quad,项目名称:sosuch-cli,代码行数:29,代码来源:sosuch.py


示例17: prepare_reporting

def prepare_reporting(mongo_addr, dbname, config_path):
    from citizendesk.common.utils import get_logger
    from citizendesk.common.dbc import mongo_dbs
    from citizendesk.feeds.config import set_config
    import citizendesk.outgest.dispatch as dispatch

    logger = get_logger()

    mongo_dbs.set_dbname(dbname)
    DbHolder = namedtuple('DbHolder', 'db')
    mongo_dbs.set_db(DbHolder(db=MongoClient(mongo_addr[0], mongo_addr[1])[mongo_dbs.get_dbname()]))

    config_data = None

    if config_path:
        try:
            cfghf = open(config_path)
            config_data = cfghf.read()
            cfghf.close()
            config_yaml = yaml.load_all(config_data)
            config_data = config_yaml.next()
            config_yaml.close()
        except:
            config_data = None
            logger.error('can not read config file: ' + str(config_path))
            return False

    dispatch.setup_blueprints(app, config_data)

    return True
开发者ID:Flowdeeps,项目名称:citizendesk-core,代码行数:30,代码来源:run.py


示例18: take_action

    def take_action(self, parsed_args):
        self.log.debug('take_action(%s)' % parsed_args)
        # set default max-width
        if parsed_args.max_width == 0:
            parsed_args.max_width = 80
        client = self.app.client_manager.congressclient
        with open(parsed_args.policy_file_path, "r") as stream:
            policies = yaml.load_all(stream)
            try:
                body = next(policies)
            except StopIteration:
                raise Exception('No policy found in file.')
            try:
                body = next(policies)
                raise Exception(
                    'More than one policy found in file. None imported.')
            except StopIteration:
                pass
        data = client.create_policy(body)

        def rule_dict_to_string(rules):
            rule_str_list = [rule['rule'] for rule in rules]
            return "\n".join(rule_str_list)

        data['rules'] = rule_dict_to_string(data['rules'])
        return zip(*sorted(six.iteritems(data)))
开发者ID:openstack,项目名称:python-congressclient,代码行数:26,代码来源:policy.py


示例19: _ImportBackupResources

  def _ImportBackupResources(self, file_name):
    f = file(os.path.dirname(__file__) + file_name, 'r')
    for res in yaml.load_all(f):
      try:
        author_key = models.Author.get_by_key_name(res['author_id'])
        author_key2 = None
        if 'author_id2' in res:
          author_key2 = models.Author.get_by_key_name(res['author_id2'])

        resource = models.Resource(
          title=res['title'],
          subtitle=res.get('subtitle') or None,
          description=res.get('description') or None,
          author=author_key,
          second_author=author_key2,
          url=unicode(res['url']),
          social_url=unicode(res.get('social_url') or ''),
          browser_support=res.get('browser_support') or [],
          update_date=res.get('update_date'),
          publication_date=res['publication_date'],
          tags=res['tags'],
          draft=False # These are previous resources. Mark them as published.
          )
        resource.put()

      except TypeError:
        pass # TODO(ericbidelman): Not sure why this is throwing an error, but
             # ignore it, whatever it is.
    f.close()
开发者ID:mikewest,项目名称:www.html5rocks.com,代码行数:29,代码来源:main.py


示例20: decomposeRedux

def decomposeRedux(osTemplateFilename):
    output = []
    with open(osTemplateFilename, 'r') as f:
        for data in yaml.load_all(f, Loader=yaml.Loader):
            objVals = data['objects']
            for obj in objVals:
                filename = 'kube-' + data['metadata']['name']
                fileType = None
                outValue = {}
                for objkey, objval in obj.items():
                    if objkey == 'kind' and objval == 'Service':
                        fileType = 'svc'
                        outValue[objkey] = objval
                    elif objkey == 'kind' and objval == 'ReplicationController':
                        fileType = 'rc'
                        outValue[objkey] = objval
                    elif objkey == 'kind' and objval == 'Pod':
                        fileType = 'po'
                        outValue[objkey] = objval
                    elif objkey == 'id':
                        filename = filename + '-' + objval
                    else:
                        outValue[objkey] = objval
                filename = filename + '-' + fileType + '.yaml'
                with open(filename, 'w') as outfile:
                    outfile.write(
                        yaml.dump(outValue, default_flow_style=False))
                    output.append(filename)
    return output
开发者ID:heckdevice,项目名称:os-kube-tesseract,代码行数:29,代码来源:osaka.py



注:本文中的yaml.load_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python yaml.load_yaml函数代码示例发布时间:2022-05-26
下一篇:
Python yaml.load函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap