本文整理汇总了Python中yaml.load_all函数的典型用法代码示例。如果您正苦于以下问题:Python load_all函数的具体用法?Python load_all怎么用?Python load_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了load_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: parse_systemdir
def parse_systemdir(systemdir):
"""
"""
files = set(iter(filter(lambda f: f.endswith('.yaml'), os.listdir(systemdir))))
systemfiles = list()
# parse default system file
if 'system.yaml' in files:
files.remove('system.yaml')
path = os.path.join(systemdir, 'system.yaml')
with open(path, 'r') as f:
streams = list(yaml.load_all(f))
if len(streams) > 0:
for i in range(len(streams)):
systemfiles.append(SystemFile(path, i, streams[i]))
else:
logger.warning("no systems found in system.yaml")
else:
logger.info("no system.yaml system file was found")
# parse all remaining system files
for systemfile in files:
path = os.path.join(systemdir, systemfile)
with open(path, 'r') as f:
streams = list(yaml.load_all(f))
if len(streams) > 0:
for i in range(len(streams)):
systemfiles.append(SystemFile(path, i, streams[i]))
else:
logger.warning("no systems found in %s", systemfile)
return systemfiles
开发者ID:mikefort,项目名称:mandelbrot,代码行数:29,代码来源:systemfile.py
示例2: __init__
def __init__(self, args):
# Load ballot records from yaml file
self.input = args[0]
self.precs = 0
try:
stream = open(self.input + '.yaml', 'r')
except:
print('Unable to open ' + self.input + '\n')
exit(0)
else:
a = audit_header.AuditHeader()
a.load_from_file(stream)
self.b = list(yaml.load_all(stream))
# If a jurisdiction_slate filename was given, then load its
# contents
if len(args) == 2:
try:
stream = open(args[1] + '.yaml', 'r')
except:
print('Unable to open ' + self.input + '\n')
exit(0)
else:
a = audit_header.AuditHeader()
a.load_from_file(stream)
self.j = yaml.load_all(stream)
else:
self.j = False
# Add the vote counts of candidates with the same ID# using
# sumation(). Write the vote totals for each candidate to the
# report stream.
self.serialize_csv(self.sumation())
开发者ID:damicof,项目名称:tabulator,代码行数:33,代码来源:tabulator.py
示例3: __init__
def __init__(self, fn=None, content=None):
IDocumentFormatter.__init__(self)
if fn is not None:
with file(fn, "r") as f:
self.__content = load_all(f, Loader=Loader)
else:
self.__content = load_all(content, Loader=Loader)
开发者ID:reyoung,项目名称:SlideGen2,代码行数:7,代码来源:yaml_formatter.py
示例4: validate
def validate(self, **kwargs):
"""
Validates a submission file
:param file_path: path to file to be loaded.
:param data: pre loaded YAML object (optional).
:return: Bool to indicate the validity of the file.
"""
try:
submission_file_schema = json.load(open(self.default_schema_file, 'r'))
additional_file_section_schema = json.load(open(self.additional_info_schema, 'r'))
# even though we are using the yaml package to load,
# it supports JSON and YAML
data = kwargs.pop("data", None)
file_path = kwargs.pop("file_path", None)
if file_path is None:
raise LookupError("file_path argument must be supplied")
if data is None:
try:
# We try to load using the CLoader for speed improvements.
data = yaml.load_all(open(file_path, 'r'), Loader=yaml.CLoader)
except Exception as e: #pragma: no cover
data = yaml.load_all(open(file_path, 'r')) #pragma: no cover
for data_item in data:
if data_item is None:
continue
try:
if 'comment' in data_item:
validate(data_item, additional_file_section_schema)
else:
validate(data_item, submission_file_schema)
except ValidationError as ve:
self.add_validation_message(
ValidationMessage(file=file_path,
message=ve.message + ' in ' + str(ve.instance)))
if self.has_errors(file_path):
return False
else:
return True
except ScannerError as se: # pragma: no cover
self.add_validation_message( # pragma: no cover
ValidationMessage(file=file_path, message=
'There was a problem parsing the file. '
'This can be because you forgot spaces '
'after colons in your YAML file for instance. '
'Diagnostic information follows.\n' + str(se)))
return False
开发者ID:HEPData,项目名称:hepdata-validator,代码行数:55,代码来源:submission_file_validator.py
示例5: __init__
def __init__(self, election, record1, record2,
merge_output):
self.rstream = open(''.join([merge_output,'.log']), 'w')
# Load ballot records from yaml file
try:
stream = open(''.join([record1,'.yml']), 'r')
except IOError:
self.rstream.write(''.join(['Unable to open ',record1,'\n']))
print(''.join(['Unable to open ',record1,'\n']))
raise
else:
a = audit_header.AuditHeader()
a.load_from_file(stream)
guid1 = a.file_id
prov1 = a.provenance
self.b1 = list(yaml.load_all(stream))
stream.close()
try:
stream = open(''.join([record2,'.yml']), 'r')
except IOError:
self.rstream.write(''.join(['Unable to open ',record2,'\n']))
print(''.join(['Unable to open ',record2,'\n']))
raise
else:
a = audit_header.AuditHeader()
a.load_from_file(stream)
guid2 = a.file_id
prov2 = a.provenance
self.b2 = list(yaml.load_all(stream))
stream.close()
# Get the election specs from file
try:
stream = open(''.join([election,'.yml']), 'r')
except IOError:
self.rstream.write(''.join(['Unable to open ',record2,'\n']))
print(''.join(['Unable to open ',record2,'\n']))
raise
else:
a = audit_header.AuditHeader()
a.load_from_file(stream)
self.templ_type = a.type
self.e = yaml.load(stream)
stream.close()
if self.e.has_key('precinct_list'):
del self.e['precinct_list']
# Combine provenances and guids from input files
self.new_prov = []
self.new_prov.extend(prov1)
self.new_prov.extend(prov2)
self.new_prov.append(guid1)
self.new_prov.append(guid2)
开发者ID:trustthevote,项目名称:Tabulator-v1,代码行数:53,代码来源:merger.py
示例6: test_generating_simple_init
def test_generating_simple_init(self, mock_conf):
mock_conf.yml_dir_path = RC_PATH
yml_path = _render_yml(self.init_variables, "generic-init.yml.j2")
with open(yml_path, 'r') as stream:
generated_files = yaml.load_all(stream.read())
with open("./fake_templates/simple-init.yml", 'r') as stream:
expected_files = yaml.load_all(stream.read())
try:
# compare pod
self.assertEqual(expected_files.next(), generated_files.next())
except StopIteration:
self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:13,代码来源:test_service.py
示例7: __init__
def __init__(self,repo):
DataBase.__init__(self,"freecad",repo)
self.bases = []
self.base_classes = Links()
self.collection_bases = Links()
if not exists(self.backend_root):
e = MalformedRepositoryError("freecad directory does not exist")
e.set_repo_path(repo.path)
raise e
for coll in listdir(self.backend_root):
basefilename = join(self.backend_root,coll,"%s.base" % coll)
if not exists(basefilename):
#skip directory that is no collection
continue
try:
base_info = list(yaml.load_all(open(basefilename,"r","utf8"), Loader=yaml.FullLoader))
# FullLoader is not implemented in pyyaml < 5.1
except AttributeError:
# this is deprecated for newer pyyaml versions
base_info = list(yaml.load_all(open(basefilename,"r","utf8")))
if len(base_info) != 1:
raise MalformedCollectionError(
"Not exactly one YAML document found in file %s" % basefilename)
base_info = base_info[0]
for basefile in base_info:
if basefile["type"] == "function":
basepath = join(self.backend_root,coll,basefile["filename"])
if not exists(basepath):
raise MalformedBaseError("Python module %s does not exist" % basepath)
for func in basefile["functions"]:
try:
function = BaseFunction(func,basefile,coll,self.backend_root)
self.bases.append(function)
self.collection_bases.add_link(repo.collections[coll],function)
for id in func["classids"]:
if not id in repo.classes:
raise MalformedBaseError(
"Unknown class %s" % id)
if self.base_classes.contains_dst(repo.classes[id]):
raise NonUniqueBaseError(id)
self.base_classes.add_link(function,repo.classes[id])
except ParsingError as e:
e.set_base(basefile["filename"])
e.set_collection(coll)
raise e
else:
raise MalformedBaseError("Unknown base type %s" % basefile["type"])
开发者ID:jreinhardt,项目名称:BOLTS,代码行数:50,代码来源:freecad.py
示例8: test_generating_without_ports
def test_generating_without_ports(self, mock_conf):
self.control_variables["ports"] = ['']
mock_conf.yml_dir_path = RC_PATH
yml_path = _render_yml(self.control_variables)
with open(yml_path, 'r') as stream:
generated_files = yaml.load_all(stream.read())
with open("./fake_templates/no_ports.yml", 'r') as stream:
expected_files = yaml.load_all(stream.read())
try:
# compare ReplicationController
self.assertEqual(expected_files.next(), generated_files.next())
except StopIteration:
self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:14,代码来源:test_service.py
示例9: test_constructor
def test_constructor(data_filename, canonical_filename, verbose=False):
_make_loader()
_make_canonical_loader()
native1 = None
native2 = None
try:
native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
assert native1 == native2, (native1, native2)
finally:
if verbose:
print("NATIVE1:")
pprint.pprint(native1)
print("NATIVE2:")
pprint.pprint(native2)
开发者ID:15580056814,项目名称:hue,代码行数:15,代码来源:test_structure.py
示例10: test_generating_with_emprtDir_path
def test_generating_with_emprtDir_path(self, mock_conf):
self.control_variables["host_path"] = None
self.control_variables["container_path"] = "/foo/bar/container"
mock_conf.yml_dir_path = RC_PATH
yml_path = _render_yml(self.control_variables)
with open(yml_path, 'r') as stream:
generated_files = yaml.load_all(stream.read())
with open("./fake_templates/emptyDir.yml", 'r') as stream:
expected_files = yaml.load_all(stream.read())
try:
# compare ReplicationController
self.assertEqual(expected_files.next(), generated_files.next())
except StopIteration:
self.fail("Incorrect type of generated yml")
开发者ID:celebdor,项目名称:stackanetes,代码行数:15,代码来源:test_service.py
示例11: load_djs
def load_djs(path, db):
with open(os.path.join(path, "djs.yaml")) as f:
for dj in yaml.load_all(f):
_populate_slug_and_name(dj)
_store_replacement(db["dj"], dj)
db["dj"][dj["slug"]] = dj
if "city" in dj:
city = dj["city"]
cityslug = slugify(city)
if cityslug not in db["city"]:
db["city"][cityslug] = dict(name=city, slug=cityslug, lint_created_by_dj=dj["slug"])
dj["city"] = cityslug
city_djs = db["city"][cityslug].setdefault("djs", [])
city_djs.append(dj["slug"])
# Normalize 'genre' to 'genres'.
genres = dj.setdefault("genres", [])
if "genre" in dj:
genres.append(dj.pop("genre"))
genreslugs = []
for genre in genres:
genreslug = slugify(genre)
if genreslug not in db["genre"]:
db["genre"][genreslug] = dict(name=genre, slug=genreslug, lint_created_by_dj=dj["slug"])
genreslugs.append(genreslug)
genredjs = db["genre"][genreslug].setdefault("djs", [])
genredjs.append(dj["slug"])
dj["genres"] = genreslugs
开发者ID:gldnspud,项目名称:fosmc,代码行数:27,代码来源:db.py
示例12: read_yaml
def read_yaml(yaml_filename):
docs=[]
with open(yaml_filename) as f:
docs=yaml.load_all(f)
docs=list(docs)
# docs=yaml.dump_all(ya, encoding='utf8', allow_unicode=True)
return docs
开发者ID:buzmakov,项目名称:fastlib,代码行数:7,代码来源:yamlutils.py
示例13: read_data
def read_data(self, datafile):
with open(datafile, "rt") as f:
parsed = yaml.load_all(f)
layout = dict2obj(parsed.next())
meta = dict2obj(parsed.next())
content = dict2obj(parsed.next())
if layout.section != 'layout':
exit('layout document in "' + datafile + '" is malformed.')
elif meta.section != 'meta':
exit('meta document in "' + datafile + '" is malformed.')
elif content.section != 'content':
exit('content document in "' + datafile + '" is malformed.')
rows = { 'rows': [] }
if layout.header:
header = []
for cell in layout.header:
header.append([eval(cell)])
else:
header = None
for rownum in layout.rows:
parsed_cell = []
for cell in rownum.items()[0][1]:
parsed_cell.append(eval(cell))
rows['rows'].append( dict(zip(rownum.keys(), [parsed_cell])) )
# return header, rows
self.header = header
self.rows = rows
开发者ID:KensoDev,项目名称:docs,代码行数:32,代码来源:table_builder.py
示例14: construct_case
def construct_case(filename, name):
"""
Parse a definition of a test case from a yaml file and construct the
TestCase subclass dynamically.
"""
def make_test(test_name, definition, i):
def m(self):
if name in SKIP_TESTS.get(self.es_version, ()):
raise SkipTest()
self.run_code(definition)
m.__doc__ = '%s:%s.test_from_yaml_%d (%s): %s' % (
__name__, name, i, '/'.join(filename.split('/')[-2:]), test_name)
m.__name__ = 'test_from_yaml_%d' % i
return m
with open(filename) as f:
tests = list(yaml.load_all(f))
attrs = {
'_yaml_file': filename
}
i = 0
for test in tests:
for test_name, definition in test.items():
if test_name == 'setup':
attrs['_setup_code'] = definition
continue
attrs['test_from_yaml_%d' % i] = make_test(test_name, definition, i)
i += 1
return type(name, (YamlTestCase, ), attrs)
开发者ID:davinirjr,项目名称:elasticsearch-py,代码行数:32,代码来源:test_common.py
示例15: update_all
def update_all(self, form, force_rollback=False):
form = self._quote_element_names(form)
form_dict_list = yaml.load_all(form)
self.session.begin()
is_modified = False
for form_dict in form_dict_list:
element_name = form_dict['element_name']
ename = ObjectSpec.parse(element_name, expected=ElementName)
# update_all() does not look in the instance_cache, must exist in DB
instance = self.session.resolve_element_spec(ename)
self._update_instance(instance, form_dict)
is_modified = is_modified or self.session.is_modified(instance)
desc = self.session.create_change_description()
if force_rollback:
self.log.info("Forced Rollback")
self.session.rollback()
elif is_modified:
self.log.fine("Instance Modified: Commit")
self.session.commit()
else:
self.log.fine("Instance Unchanged: Rollback")
self.session.rollback()
return desc
开发者ID:nveeser,项目名称:dino,代码行数:31,代码来源:element_form.py
示例16: parse_post
def parse_post(f):
parsed = yaml.load_all(f)
post_header = next(parsed)
if 'postready' in post_header and (not post_header['postready']):
print("Post not ready")
return None
f.seek(0)
l = f.readline()
while (l != '...\n') and (l != ''):
l = f.readline()
if l == '':
return None
comment = f.read()
post_header['comment'] = comment
imgs = []
if 'image1' in post_header and post_header['image1']:
with open(post_header['image1'], 'rb') as f:
imgs += [f.read()]
if 'image2' in post_header and post_header['image2']:
with open(post_header['image2'], 'rb') as f:
imgs += [f.read()]
if 'image3' in post_header and post_header['image3']:
with open(post_header['image3'], 'rb') as f:
imgs += [f.read()]
if 'image4' in post_header and post_header['image4']:
with open(post_header['image4'], 'rb') as f:
imgs += [f.read()]
post_header['imgs'] = imgs
return post_header
开发者ID:gordon-quad,项目名称:sosuch-cli,代码行数:29,代码来源:sosuch.py
示例17: prepare_reporting
def prepare_reporting(mongo_addr, dbname, config_path):
from citizendesk.common.utils import get_logger
from citizendesk.common.dbc import mongo_dbs
from citizendesk.feeds.config import set_config
import citizendesk.outgest.dispatch as dispatch
logger = get_logger()
mongo_dbs.set_dbname(dbname)
DbHolder = namedtuple('DbHolder', 'db')
mongo_dbs.set_db(DbHolder(db=MongoClient(mongo_addr[0], mongo_addr[1])[mongo_dbs.get_dbname()]))
config_data = None
if config_path:
try:
cfghf = open(config_path)
config_data = cfghf.read()
cfghf.close()
config_yaml = yaml.load_all(config_data)
config_data = config_yaml.next()
config_yaml.close()
except:
config_data = None
logger.error('can not read config file: ' + str(config_path))
return False
dispatch.setup_blueprints(app, config_data)
return True
开发者ID:Flowdeeps,项目名称:citizendesk-core,代码行数:30,代码来源:run.py
示例18: take_action
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
# set default max-width
if parsed_args.max_width == 0:
parsed_args.max_width = 80
client = self.app.client_manager.congressclient
with open(parsed_args.policy_file_path, "r") as stream:
policies = yaml.load_all(stream)
try:
body = next(policies)
except StopIteration:
raise Exception('No policy found in file.')
try:
body = next(policies)
raise Exception(
'More than one policy found in file. None imported.')
except StopIteration:
pass
data = client.create_policy(body)
def rule_dict_to_string(rules):
rule_str_list = [rule['rule'] for rule in rules]
return "\n".join(rule_str_list)
data['rules'] = rule_dict_to_string(data['rules'])
return zip(*sorted(six.iteritems(data)))
开发者ID:openstack,项目名称:python-congressclient,代码行数:26,代码来源:policy.py
示例19: _ImportBackupResources
def _ImportBackupResources(self, file_name):
f = file(os.path.dirname(__file__) + file_name, 'r')
for res in yaml.load_all(f):
try:
author_key = models.Author.get_by_key_name(res['author_id'])
author_key2 = None
if 'author_id2' in res:
author_key2 = models.Author.get_by_key_name(res['author_id2'])
resource = models.Resource(
title=res['title'],
subtitle=res.get('subtitle') or None,
description=res.get('description') or None,
author=author_key,
second_author=author_key2,
url=unicode(res['url']),
social_url=unicode(res.get('social_url') or ''),
browser_support=res.get('browser_support') or [],
update_date=res.get('update_date'),
publication_date=res['publication_date'],
tags=res['tags'],
draft=False # These are previous resources. Mark them as published.
)
resource.put()
except TypeError:
pass # TODO(ericbidelman): Not sure why this is throwing an error, but
# ignore it, whatever it is.
f.close()
开发者ID:mikewest,项目名称:www.html5rocks.com,代码行数:29,代码来源:main.py
示例20: decomposeRedux
def decomposeRedux(osTemplateFilename):
output = []
with open(osTemplateFilename, 'r') as f:
for data in yaml.load_all(f, Loader=yaml.Loader):
objVals = data['objects']
for obj in objVals:
filename = 'kube-' + data['metadata']['name']
fileType = None
outValue = {}
for objkey, objval in obj.items():
if objkey == 'kind' and objval == 'Service':
fileType = 'svc'
outValue[objkey] = objval
elif objkey == 'kind' and objval == 'ReplicationController':
fileType = 'rc'
outValue[objkey] = objval
elif objkey == 'kind' and objval == 'Pod':
fileType = 'po'
outValue[objkey] = objval
elif objkey == 'id':
filename = filename + '-' + objval
else:
outValue[objkey] = objval
filename = filename + '-' + fileType + '.yaml'
with open(filename, 'w') as outfile:
outfile.write(
yaml.dump(outValue, default_flow_style=False))
output.append(filename)
return output
开发者ID:heckdevice,项目名称:os-kube-tesseract,代码行数:29,代码来源:osaka.py
注:本文中的yaml.load_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论