本文整理汇总了Python中xmltodict.parse函数的典型用法代码示例。如果您正苦于以下问题:Python parse函数的具体用法?Python parse怎么用?Python parse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _process_file
def _process_file(self, f, attr_prefix='ATTR_'):
"""xmltodict can either return attributes of nodes as prefixed fields
(prefixes to avoid key collisions), or ignore them altogether.
set attr prefix to whatever you want. Setting it to False ignores
attributes.
"""
import xmltodict
if self.postprocessor:
obj = xmltodict.parse(f, attr_prefix=self.attr_prefix,
postprocessor=self.postprocessor)
else:
obj = xmltodict.parse(f, attr_prefix=self.attr_prefix)
# If node list was given, walk down the tree
if self.node_list:
for node in self.node_list:
obj = obj[node]
# If the top-level XML object in the file is a list
# then yield each element separately; otherwise, yield
# the top-level object.
if isinstance(obj, list):
for record in obj:
yield record
else:
yield obj
开发者ID:hpetru,项目名称:saucebrush,代码行数:30,代码来源:sources.py
示例2: test_disable_entities_true_attempts_external_dtd
def test_disable_entities_true_attempts_external_dtd(self):
xml = """
<!DOCTYPE external [
<!ENTITY ee SYSTEM "http://www.python.org/">
]>
<root>ⅇ</root>
"""
def raising_external_ref_handler(*args, **kwargs):
parser = ParserCreate(*args, **kwargs)
parser.ExternalEntityRefHandler = lambda *x: 0
try:
feature = "http://apache.org/xml/features/disallow-doctype-decl"
parser._reader.setFeature(feature, True)
except AttributeError:
pass
return parser
expat.ParserCreate = raising_external_ref_handler
# Using this try/catch because a TypeError is thrown before
# the ExpatError, and Python 2.6 is confused by that.
try:
parse(xml, disable_entities=False, expat=expat)
except expat.ExpatError:
self.assertTrue(True)
else:
self.assertTrue(False)
expat.ParserCreate = ParserCreate
开发者ID:lindsay-stevens,项目名称:xmltodict,代码行数:26,代码来源:test_xmltodict.py
示例3: test_nested
def test_nested(self):
obj = {'a': {'b': '1', 'c': '2'}}
self.assertEqual(obj, parse(unparse(obj)))
self.assertEqual(unparse(obj), unparse(parse(unparse(obj))))
obj = {'a': {'b': {'c': {'@a': 'x', '#text': 'y'}}}}
self.assertEqual(obj, parse(unparse(obj)))
self.assertEqual(unparse(obj), unparse(parse(unparse(obj))))
开发者ID:zhanglei002,项目名称:xmltodict,代码行数:7,代码来源:test_dicttoxml.py
示例4: convert_results
def convert_results(self, results, output_format,
return_format, inherit_from):
if output_format == 'json':
if return_format.lower() == 'xml':
results = dicttoxml(json.loads(results))
elif return_format.lower() == 'object':
results = self.json_to_object(json.loads(results),
'QueryObject',
inherit_from)
else:
results = json.loads(results)
elif output_format == 'xml':
if return_format.lower() == 'json':
results = json.loads(json.dumps(xmltodict.parse(results)))
elif return_format.lower() == 'object':
jsonresults = json.loads(json.dumps(xmltodict.parse(results)))
results = self.json_to_object(jsonresults,
'QueryObject',
inherit_from)
elif output_format == 'javascript':
if return_format.lower() in ('json', 'xml', 'object'):
print ('Cannot Convert \'JavaScript\' response to \'' +
return_format.lower() +'\'...returning \'JavaScript\'')
pass
return results
开发者ID:bfk,项目名称:bibs,代码行数:25,代码来源:bibs.py
示例5: nfo_xml_file_tv
def nfo_xml_file_tv(media_file_path):
"""
Find and load nfo and xml file(s) if they exist
"""
nfo_data = None
xml_data = None
# check for NFO or XML as no need to do lookup if ID found in it
# TODO should check for one dir back too I suppose
nfo_file_check = media_file_path.rsplit('/', 1)[0] + 'tvinfo.nfo'
if os.path.isfile(nfo_file_check): # check for nfo
common_global.es_inst.com_elastic_index('info', {'nfo tv file found': nfo_file_check})
try:
nfo_data = xmltodict.parse(common_file.com_file_load_data(nfo_file_check, False))
except xml.parsers.expat.ExpatError:
pass
except UnicodeDecodeError:
pass
else:
nfo_file_check = media_file_path.rsplit('/', 1)[0] + 'tvshow.nfo'
if os.path.isfile(nfo_file_check): # check for nfo
common_global.es_inst.com_elastic_index('info', {'nfo tv file found2': nfo_file_check})
try:
nfo_data = xmltodict.parse(common_file.com_file_load_data(nfo_file_check, False))
except xml.parsers.expat.ExpatError:
pass
except UnicodeDecodeError:
pass
return nfo_data, xml_data
开发者ID:MediaKraken,项目名称:MediaKraken_Deployment,代码行数:28,代码来源:metadata_nfo_xml.py
示例6: test_encoded_string
def test_encoded_string(self):
try:
value = unichr(39321)
except NameError:
value = chr(39321)
xml = "<a>%s</a>" % value
self.assertEqual(parse(xml), parse(xml.encode("utf-8")))
开发者ID:Mondego,项目名称:pyreco,代码行数:7,代码来源:allPythonContent.py
示例7: _epub_parser
def _epub_parser(epub):
"""
Handle EPUB specific parsing
Return dict of ebook metadata
An EPUB must contain META-INF/container.xml, which contains the path to
the EPUB metadata file.
"""
sha256 = file_hash(epub)
zf = ZipFile(epub)
xml = xmltodict.parse(zf.read('META-INF/container.xml'))
metadata_path = xml['container']['rootfiles']['rootfile']['@full-path'] # TODO: validate this is true for all EPUBs
raw_metadata = xmltodict.parse(zf.read(metadata_path))
metadata = {'format': 'epub'}
for k, v in raw_metadata['package']['metadata'].items():
if 'dc:' in k:
if 'creator' in k: # Required element, needs additional parsing
k = 'author'
v = v['#text']
if 'identifier' in k: # Required element, needs additional parsing
k = 'identifiers'
if not isinstance(v, list):
v = [v] # Just in case we get a single element
identifiers = []
for i in v:
identifiers.append({'identifier': i['@opf:scheme'], 'value': i['#text']}) # Support multiple identifiers
v = identifiers
metadata[k.split('dc:')[-1]] = v
metadata['identifiers'].append({'identifier': 'sha256', 'value': sha256})
return metadata
开发者ID:dreadpirate15,项目名称:coppermind,代码行数:30,代码来源:parser.py
示例8: parse_nessus_file
def parse_nessus_file(nessus_file, protocol):
targets = []
def handle_nessus_file(path, item):
# Must return True otherwise xmltodict will throw a ParsingIterrupted() exception
# https://github.com/martinblech/xmltodict/blob/master/xmltodict.py#L219
if any('ReportHost' and 'ReportItem' in values for values in path):
item = dict(path)
ip = item['ReportHost']['name']
if ip in targets:
return True
port = item['ReportItem']['port']
svc_name = item['ReportItem']['svc_name']
if port in protocol_dict[protocol]['ports']:
targets.append(ip)
if svc_name in protocol_dict[protocol]['services']:
targets.append(ip)
return True
else:
return True
with open(nessus_file, 'r') as file_handle:
xmltodict.parse(file_handle, item_depth=4, item_callback=handle_nessus_file)
return targets
开发者ID:0xe7,项目名称:CrackMapExec,代码行数:29,代码来源:nessus.py
示例9: __init__
def __init__(self, conn, switch_dict):
self.conn = conn
self.switch_dict = switch_dict
self.op_rpc = (
'show ethernet-switching interfaces detail | display xml')
self.xml_output = self.conn.send_command(self.op_rpc)
if self.switch_dict['username'] == "root":
self.clean_xml = str(self.xml_output).partition("\n")[2]
else:
self.clean_xml = str(self.xml_output).strip().partition("\n")[2]\
if self.clean_xml:
try:
self.dict_of_xml = xmltodict.parse(self.clean_xml)
except ExpatError:
try:
self.dict_of_xml = xmltodict.parse(self.xml_output)
except:
raise
self.up_access_interfaces = 0
for interface in self.dict_of_xml['rpc-reply']['switching-interface-information']['interface']:
self.gige_re = re.compile('ge-.*')
if (self.gige_re.match(interface['interface-name']) and
interface['interface-port-mode'] == "Access" and
interface['interface-state'] == "up"):
self.up_access_interfaces += 1
else:
print("I connected, but no valid response was received from the "
"switch. Here's the raw output:<snip>\n{0}".format(self.xml_output))
print("</snip>")
raise ValueError
开发者ID:scott-abt,项目名称:junos-interface-utilization,代码行数:33,代码来源:get_interface_util.py
示例10: add_keywords
def add_keywords(self, domain_name, keywords):
payload = {
'user_name': self.user_name,
'api_key': self.api_key,
'action': 'managedomains',
'type': 'keyword',
'setting': keywords[0],
'domain': domain_name
}
response = requests.get('https://api.parkingcrew.com/manage_v3.php', params=payload)
response_dict = xmltodict.parse(response.text)
if response_dict['response']['result']['success'] == '0':
return (False, response_dict['response']['result']['error']['msg'])
payload = {
'user_name': self.user_name,
'api_key': self.api_key,
'action': 'managedomains',
'type': 'related',
'setting': '|'.join(keywords),
'domain': domain_name
}
response = requests.get('https://api.parkingcrew.com/manage_v3.php', params=payload)
response_dict = xmltodict.parse(response.text)
if response_dict['response']['result']['success'] == '1':
return (True, 'Success')
elif response_dict['response']['result']['success'] == '0':
return (False, response_dict['response']['result']['error']['msg'])
开发者ID:SkullTech,项目名称:domain-park,代码行数:32,代码来源:core.py
示例11: get_info
def get_info(self, request):
path = self.translate_path(request.form['path'])
parts = path.partition('/representations')
ip = parts[0]
hrefs = self._get_href_variations(parts[1] + parts[2])
namespace = '{http://ead3.archivists.org/schema/}'
tree = ET.parse('%s/metadata/descriptive/EAD.xml' % ip)
# regular file - daoset
for href in hrefs:
did_list = tree.findall(".//%sdid/*/%sdao[@href='%s']/../.."
% (namespace, namespace, href))
if did_list:
o = xmltodict.parse(ET.tostring(did_list[0]))
return json.dumps(o)
# regular file - no daoset
for href in hrefs:
did_list = tree.findall(".//%sdid/%sdao[@href='%s']/.."
% (namespace, namespace, href))
if did_list:
o = xmltodict.parse(ET.tostring(did_list[0]))
return json.dumps(o)
# directory
for href in hrefs:
did_list = tree.findall(".//%sc[@base='%s']/%sdid"
% (namespace, href, namespace))
if did_list:
o = xmltodict.parse(ET.tostring(did_list[0]))
return json.dumps(o)
# fallback
return flask.jsonify(
error=404,
error_text='Not Found',
info='No metadata associated to this element'
)
开发者ID:magenta-aps,项目名称:eark-python-bridge,代码行数:34,代码来源:handlers.py
示例12: test_gen_model
def test_gen_model():
stream = StringIO()
delegate = mock
with XMLGenerator(stream, skip_stringify=True) as xg:
delegate._gen_model(xg, Model('single_port_ram',
(ModelInputPort("we", clock="clk"),
ModelInputPort("addr", clock="clk", combinational_sink_ports=["out"]),
ModelInputPort("data", clock="clk", combinational_sink_ports=["out"]),
ModelInputPort("clk", is_clock=True)),
(ModelOutputPort("out", clock="clk"), )))
back = parse(stream.getvalue(), encoding="ascii", dict_constructor=dict)
gold = parse("""
<model name="single_port_ram">
<input_ports>
<port name="we" clock="clk"/>
<port name="addr" clock="clk" combinational_sink_ports="out"/>
<port name="data" clock="clk" combinational_sink_ports="out"/>
<port name="clk" is_clock="1"/>
</input_ports>
<output_ports>
<port name="out" clock="clk"/>
</output_ports>
</model>
""", dict_constructor=dict)
assert back == gold
开发者ID:leon575777642,项目名称:vprgen,代码行数:25,代码来源:test_abstractbased_archgen_namedtuplebased_impl.py
示例13: geoSearch
def geoSearch(geo=None):
photos = flickr.photos.search(lat=41.8830663, lon =-87.63293, radius = 20)
photos = xmltodict.parse(photos)
for photo in photos['photos']['photo']:
photo_id = photo['@id']
photo_id = json.dumps(photo_id)
#location = venue['location']
#lon = venue['location']['lat']
#lng = venue['location']['lng']
#comments = flickr.photos.comments.getList(lat=41.8830663, lon =-87.63293, radius = 20)
info = flickr.photos.getInfo(photo_id = photo_id)
info = xmltodict.parse(info)
description = info['photo']['description']
location = flickr.photos.geo.getLocation(photo_id = photo_id)
location = xmltodict.parse(location)
location = info['photo']['location']
temp = {}
temp['text'] = description
temp['location'] = location
insert(temp)
开发者ID:GaryZhangUIUC,项目名称:HIGTOM,代码行数:30,代码来源:flickr.py
示例14: retrieve_individual_firewall
def retrieve_individual_firewall():
# Individual fw
db.drop_collection('fw')
retrieveConfig('fw', os.getenv("USER"),password)
xml_file = 'fw.xml'
with open(xml_file) as fd:
mydict = xmltodict.parse(fd.read())
os.remove('%s' %xml_file)
db.fw.insert(mydict,check_keys=False) # check_keys false otherwise
# Individual fw
db.drop_collection('fw')
retrieveConfig('fw1', os.getenv("USER"),password)
xml_file = 'fw1.xml'
with open(xml_file) as fd:
mydict = xmltodict.parse(fd.read())
os.remove('%s' %xml_file)
db.fw1.insert(mydict,check_keys=False) # check_keys false otherwise
# Individual fwxd
db.drop_collection('fw1')
retrieveConfig('fw2', os.getenv("USER"),password)
xml_file = 'fw2.xml'
with open(xml_file) as fd:
mydict = xmltodict.parse(fd.read())
os.remove('%s' %xml_file)
db.fw2.insert(mydict,check_keys=False) # check_keys false otherwise
开发者ID:shanecon,项目名称:juniper,代码行数:25,代码来源:juniper_config_extractor.py
示例15: get_dsn_raw
def get_dsn_raw():
""" returns a current snapshot of the DSN xml feed converted to json, and updates a copy in redis.
gets dsn xml feed, converts to json, saves json to redis, returns json """
# pass the url a param 'r' = timestamp to avoid hitting their cloudfront cache
timestamp = str(int(mktime(datetime.now().timetuple())))
response = urlopen('https://eyes.nasa.gov/dsn/data/dsn.xml?r=' + timestamp)
dom=parse(response)
dsn_data = {}
for node in dom.childNodes[0].childNodes:
if not hasattr(node, 'tagName'): # useless nodes
continue
# dsn feed is strange: dishes should appear inside station nodes but don't
# so converting entire xml doc to dict loses the station/probe relation
# so have to parse node by node to grab station THEN convert dish node to dict
if node.tagName == 'station':
xmltodict.parse(node.toxml())
station = node.getAttribute('friendlyName')
dsn_data.setdefault(station, {})
dsn_data[station]['friendlyName'] = node.getAttribute('friendlyName')
dsn_data[station]['timeUTC'] = node.getAttribute('timeUTC')
dsn_data[station]['timeZoneOffset'] = node.getAttribute('timeZoneOffset')
if node.tagName == 'dish':
dsn_data[station].setdefault('dishes', []).append(xmltodict.parse(node.toxml())['dish'])
r_server.set('dsn_raw', dumps(dsn_data))
return dsn_data
开发者ID:spacehackers,项目名称:api.spaceprob.es,代码行数:32,代码来源:dsn.py
示例16: bus_data
def bus_data():
cta_key = 'w8xeHXDhPGHtGCY7mngPuNcpD'
routes_api_url = (
'http://www.ctabustracker.com/bustime/api/v1/getroutes?key=%s')
vehicle_api_url = (
'http://www.ctabustracker.com/bustime/api/v1/getvehicles?key=%s&rt=%s')
local_file = (
'/Users/abrahamepton/Tribune/toytrains/static/json/bus_data.json')
s3_key = 'toytrains/static/json/bus_data.json'
response = urllib2.urlopen(routes_api_url % cta_key)
routes = xmltodict.parse(response.read())
curr_idx = 0
route_requests = [[]]
for r in routes['bustime-response']['route']:
if len(route_requests[curr_idx]) == 10:
curr_idx += 1
route_requests.append([])
route_requests[curr_idx].append(r['rt'])
vehicles = []
for rts in route_requests:
response = urllib2.urlopen(vehicle_api_url % (cta_key, ','.join(rts)))
data = xmltodict.parse(response.read())
for vehicle in data['bustime-response']['vehicle']:
vehicles.append(vehicle)
FH = open(local_file, 'w')
FH.write(json.dumps(vehicles))
FH.close()
_write_string_to_s3(s3_key, json.dumps(vehicles))
return 'Done'
开发者ID:aepton,项目名称:toytrains,代码行数:30,代码来源:routes.py
示例17: get_fundir
def get_fundir(session, mps_in_nefndir):
url = 'http://www.althingi.is/altext/xml/nefndarfundir/?lthing='+str(session)
response = requests.get(url)
data = xmltodict.parse(response.text)
fundir = []
for fundur in data[u'nefndarfundir'][u'nefndarfundur']:
try:
dagsetning = fundur[u'hefst'][u'dagur']
nefnd_id = fundur[u'nefnd'][u'@id']
fundargerd = requests.get(fundur[u'nánar'][u'fundargerð'][u'xml'])
fundargerd_data = xmltodict.parse(fundargerd.text)
maeting = fundargerd_data[u'nefndarfundur'][u'fundargerð'][u'texti'].split('</h2>')[1].split('<BR><BR>')[0]
attendance = maeting.split('<BR>')
mps = []
for a in attendance:
if 'fyrir' in a:
m = a.split('fyrir ')[1].split(' (')[0]
#laga fallbeygingu
mps.append(process.extractOne(m, mps_in_nefndir)[0])
else:
mps.append(a.split(' (')[0])
#mps = [i.split(' (')[0] for i in attendance]
fundir.append({'nefnd': nefnd_id, 'mps': mps, 'dagsetning': dagsetning})
except Exception as e:
print(fundur[u'nefnd']['#text'] + ' - ' + fundur[u'hefst'][u'texti'])
return fundir
开发者ID:bjornlevi,项目名称:5thpower,代码行数:26,代码来源:nefndir.py
示例18: create_get_workout_xml
def create_get_workout_xml(login_response_xml):
login_xml_dict = xmltodict.parse(login_response_xml)
with open(get_workout_xml) as fd:
workout_xml_dict = xmltodict.parse(fd.read())
set_ids_for_workout_xml(login_xml_dict, workout_xml_dict)
xml_to_send = xmltodict.unparse(workout_xml_dict)
return xml_to_send
开发者ID:josephvolpe,项目名称:Running2Win-CLI,代码行数:7,代码来源:running2win.py
示例19: generate_references
def generate_references(self):
self.zip_file = zipfile.ZipFile(
self.book_filename, mode='r', allowZip64=True)
self.file_list = self.zip_file.namelist()
# Book structure relies on parsing the .opf file
# in the book. Now that might be the usual content.opf
# or package.opf or it might be named after your favorite
# eldritch abomination. The point is we have to check
# the container.xml
container = self.find_file('container.xml')
if container:
container_xml = self.zip_file.read(container)
container_dict = xmltodict.parse(container_xml)
packagefile = container_dict['container']['rootfiles']['rootfile']['@full-path']
else:
presumptive_names = ('content.opf', 'package.opf', 'volume.opf')
for i in presumptive_names:
packagefile = self.find_file(i)
if packagefile:
logger.info('Using presumptive package file: ' + self.book_filename)
break
packagefile_data = self.zip_file.read(packagefile)
self.opf_dict = xmltodict.parse(packagefile_data)
开发者ID:guoyunhe,项目名称:Lector,代码行数:25,代码来源:read_epub.py
示例20: test_init_with_safe_encrypt_mode
def test_init_with_safe_encrypt_mode(self):
""" 测试安全模式下的初始化 """
conf = WechatConf(
token=self.token,
appid=self.appid,
appsecret=self.appsecret,
encrypt_mode="safe",
encoding_aes_key=self.encoding_aes_key,
)
self.assertEqual(conf.token, self.token)
self.assertEqual(conf.appid, self.appid)
self.assertEqual(conf.appsecret, self.appsecret)
self.assertEqual(conf.encoding_aes_key, self.encoding_aes_key)
self.assertIsNotNone(conf.crypto)
# 测试解密微信服务器的请求消息
req = conf.crypto.decrypt_message(
msg=self.safe_request_message, msg_signature=self.msg_signature, timestamp=self.timestamp, nonce=self.nonce
)
self.assertEqual(xmltodict.parse(req), xmltodict.parse(self.normal_request_message))
# 测试加密返回信息
origin_crypto = conf.crypto._WechatBaseCrypto__pc
conf.crypto._WechatBaseCrypto__pc = TestBaseCrypto(key=conf.crypto._WechatBaseCrypto__key)
resp = conf.crypto.encrypt_message(msg=self.response_message, nonce=self.nonce, timestamp=self.timestamp)
self.assertEqual(xmltodict.parse(resp), xmltodict.parse(self.response_encrypted_message))
conf.crypto._WechatBaseCrypto__pc = origin_crypto
开发者ID:Rand01ph,项目名称:wechat-python-sdk,代码行数:27,代码来源:test_conf.py
注:本文中的xmltodict.parse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论