本文整理汇总了Python中simplejson.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_session
def get_session(self):
""" authenticate into the CitySDK Mobility API and return session token """
self.verbose('Authenticating to CitySDK')
logger.info('== Authenticating to CitySDK ==')
authentication_url = '%sget_session?e=%s&p=%s' % (
self.citysdk_url,
self.config['citysdk_username'],
self.config['citysdk_password']
)
try:
response = requests.get(
authentication_url,
verify=self.config.get('verify_SSL', True)
)
except Exception as e:
message = 'API Authentication Error: "%s"' % e
logger.error(message)
raise ImproperlyConfigured(message)
if response.status_code != 200:
try:
message = 'API Authentication Error: "%s"' % json.loads(response.content)['message']
except Exception:
message = 'API Authentication Error: "%s"' % response.content
logger.error(message)
raise ImproperlyConfigured(message)
# store session token
# will be valid for 1 minute after each request
session = json.loads(response.content)['results'][0]
return session
开发者ID:bucciarellialessandro,项目名称:nodeshot,代码行数:34,代码来源:CitySdkMobility.py
示例2: test_json_configs
def test_json_configs(self):
""" Currently only "is this well-formed json?"
"""
config_files = self.query_config_files()
filecount = [0, 0]
for config_file in config_files:
if config_file.endswith(".json"):
filecount[0] += 1
self.info("Testing %s." % config_file)
contents = self.read_from_file(config_file, verbose=False)
try:
json.loads(contents)
except ValueError:
self.add_summary("%s is invalid json." % config_file,
level="error")
self.error(pprint.pformat(sys.exc_info()[1]))
else:
self.info("Good.")
filecount[1] += 1
if filecount[0]:
self.add_summary("%d of %d json config files were good." %
(filecount[1], filecount[0]))
else:
self.add_summary("No json config files to test.")
开发者ID:catlee,项目名称:build-mozharness,代码行数:25,代码来源:configtest.py
示例3: _list
def _list(self, url, query_string=''):
"""Lists all existing remote resources.
Resources in listings can be filterd using `query_string` formatted
according to the syntax and fields labeled as filterable in the BigML
documentation for each resource.
Sufixes:
__lt: less than
__lte: less than or equal to
__gt: greater than
__gte: greater than or equal to
For example:
'size__gt=1024'
Resources can also be sortened including a sort_by statement within
the `query_sting`. For example:
'order_by=size'
"""
code = HTTP_INTERNAL_SERVER_ERROR
meta = None
resources = None
error = {
"status": {
"code": code,
"message": "The resource couldn't be listed"}}
try:
response = requests.get(url + self.auth + query_string,
headers=ACCEPT_JSON, verify=VERIFY)
code = response.status_code
if code == HTTP_OK:
resource = json.loads(response.content, 'utf-8')
meta = resource['meta']
resources = resource['objects']
error = None
elif code in [HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, HTTP_NOT_FOUND]:
error = json.loads(response.content, 'utf-8')
else:
LOGGER.error("Unexpected error (%s)" % code)
code = HTTP_INTERNAL_SERVER_ERROR
except ValueError:
LOGGER.error("Malformed response")
except requests.ConnectionError:
LOGGER.error("Connection error")
except requests.Timeout:
LOGGER.error("Request timed out")
except requests.RequestException:
LOGGER.error("Ambiguous exception occurred")
return {
'code': code,
'meta': meta,
'objects': resources,
'error': error}
开发者ID:seamusabshere,项目名称:python,代码行数:60,代码来源:api.py
示例4: getOsdMapInfos
def getOsdMapInfos(self, pgid):
Log.info("___getOsdMapInfos(pgid=" + str(pgid) + ")")
cephRestApiUrl = self.cephRestApiUrl + 'tell/' + pgid + '/query.json';
Log.debug("____cephRestApiUrl Request=" + cephRestApiUrl)
osdmap = []
data = requests.get(cephRestApiUrl)
r = data.content
if data.status_code != 200:
print 'Error ' + str(data.status_code) + ' on the request getting pools'
return osdmap
# print(r)
if len(r) > 0:
osdmap = json.loads(r)
else:
Log.err('The getOsdMapInfos() method returns empty data')
osdmap = json.loads(r)
# osdmap=r.read()
# Log.debug(osdmap)
acting = osdmap["output"]["acting"]
up = osdmap["output"]["up"]
state = osdmap["output"]["state"]
acting_primary = osdmap["output"] ["info"]["stats"]["acting_primary"]
up_primary = osdmap["output"]["info"]["stats"]["up_primary"]
osdmap_infos = {"acting":acting, "acting_primary":acting_primary, "state":state, "up":up, "up_primary":up_primary}
return osdmap_infos
开发者ID:minshenglin,项目名称:inkscope,代码行数:30,代码来源:S3ObjectCtrl.py
示例5: test_aggregations_datasets_with_two_groups
def test_aggregations_datasets_with_two_groups(self):
self.dataset_id = self._post_file()
group = 'food_type'
self._post_calculations(self.default_formulae + ['sum(amount)'])
self._post_calculations(['sum(gps_alt)'], group)
groups = ['', group]
results = self._test_aggregations(groups)
for row in results:
self.assertEqual(row.keys(), ['sum_amount_'])
self.assertTrue(isinstance(row.values()[0], float))
# get second linked dataset
results = json.loads(self.controller.aggregations(self.dataset_id))
self.assertEqual(len(results.keys()), len(groups))
self.assertEqual(results.keys(), groups)
linked_dataset_id = results[group]
self.assertTrue(isinstance(linked_dataset_id, basestring))
# inspect linked dataset
results = json.loads(self.controller.show(linked_dataset_id))
row_keys = [group, 'sum_gps_alt_']
for row in results:
self.assertEqual(row.keys(), row_keys)
开发者ID:zbyufei,项目名称:bamboo,代码行数:28,代码来源:test_datasets.py
示例6: read_request_data
def read_request_data(self):
if request.data:
return json.loads(request.data.decode('utf-8'))
elif request.form.get('data'):
return json.loads(request.form['data'])
else:
return dict(request.form)
开发者ID:coleifer,项目名称:flask-peewee,代码行数:7,代码来源:rest.py
示例7: get_actions
def get_actions(self,coordinator):
accumulator=dict()
accumulator['total']=0
try:
url = "http://" + self.host + ":" + str(self.port) + self.api_url['actions_from_coordinator'] % (coordinator,0,0)
response = requests.get(url, auth=self.html_auth)
if not response.ok:
return {}
total_actions=json.loads(response.content)['total']
url = "http://" + self.host + ":" + str(self.port) + self.api_url['actions_from_coordinator'] % (coordinator,total_actions-self.query_size,self.query_size)
response = requests.get(url, auth=self.html_auth)
if not response.ok:
return {}
actions = json.loads(response.content)['actions']
for action in actions:
created=time.mktime(self.time_conversion(action['createdTime']))
modified=time.mktime(self.time_conversion(action['lastModifiedTime']))
runtime=modified-created
if accumulator.get(action['status']) is None:
accumulator[action['status']]=defaultdict(int)
accumulator[action['status']]['count']+=1
accumulator[action['status']]['runtime']+=runtime
accumulator['total']+=1
except:
logging.error('http request error: "%s"' % url)
return {}
return accumulator
开发者ID:helakelas,项目名称:nagios-hadoop,代码行数:30,代码来源:ooziestatus.py
示例8: test_including_replace_image
def test_including_replace_image(self):
## if we are replacing, the passed-in image url
## is added as a fake 1th element.
import simplejson
from karl.content.interfaces import IImage
from zope.interface import directlyProvides
context = testing.DummyModel()
image = context["boo.jpg"] = testing.DummyModel()
image.title = "Boo"
directlyProvides(image, IImage)
request = testing.DummyRequest(params={"include_image_url": "/boo.jpg", "source": "myrecent"})
response = self._call_fut(context, request)
self.assertEqual(response.status, "200 OK")
self.assertEqual(
self.dummy_get_images_batch.called,
(context, request, {"community": None, "batch_start": 0, "batch_size": 12, "creator": None}),
)
data = simplejson.loads(response.body)
self.assertEqual(data["images_info"], {"totalRecords": 6, "start": 0, "records": ["Boo", "foo", "bar"]})
# if we don't ask for the 0th index: it's not
# added, but the sizes and indexes are aligned.
request = testing.DummyRequest(params={"include_image_url": "/boo.jpg", "source": "myrecent", "start": "1"})
response = self._call_fut(context, request)
self.assertEqual(response.status, "200 OK")
self.assertEqual(
self.dummy_get_images_batch.called,
(context, request, {"community": None, "batch_start": 0, "batch_size": 12, "creator": None}),
)
data = simplejson.loads(response.body)
self.assertEqual(data["images_info"], {"totalRecords": 6, "start": 1, "records": ["foo", "bar"]})
开发者ID:reebalazs,项目名称:karl,代码行数:33,代码来源:test_imagedrawer.py
示例9: _get_more
def _get_more(self, last_key, value_dict):
key = None
params = value_dict.get('params', {})
if not last_key:
key = value_dict.get('start_marker')
else:
key = last_key
if key:
params['marker'] = key
response = self.connection.request(value_dict['url'], params)
# newdata, self._last_key, self._exhausted
if response.status == httplib.NO_CONTENT:
return [], None, False
elif response.status == httplib.OK:
resp = json.loads(response.body)
l = None
if 'list_item_mapper' in value_dict:
func = value_dict['list_item_mapper']
l = [func(x, value_dict) for x in resp['values']]
else:
l = value_dict['object_mapper'](resp, value_dict)
m = resp['metadata'].get('next_marker')
return l, m, (m is None)
body = json.loads(response.body)
details = body['details'] if 'details' in body else ''
raise LibcloudError('Unexpected status code: %s (url=%s, details=%s)' %
(response.status, value_dict['url'], details))
开发者ID:danngrant,项目名称:rackspace-monitoring,代码行数:35,代码来源:rackspace.py
示例10: call_prediction
def call_prediction(agency_id, route_id, direction_id, stop_id, phone_num):
user = users.get_current_user()
app.logger.info("%s (%s) called %s" % (user.nickname(), user.user_id(), phone_num))
url = DOMAIN + url_for('prediction_list', agency_id=agency_id,
route_id=route_id, direction_id=direction_id, stop_id=stop_id, format="twiml")
call_info = {
'From': PHONE_NUMBER,
'To': phone_num,
'Url': url,
'Method': 'GET',
}
try:
call_json = account.request(
'/%s/Accounts/%s/Calls.json' % (API_VERSION, ACCOUNT_SID),
'POST',
call_info)
app.logger.info(call_json)
call = json.loads(call_json)
return "Now calling %s with call ID %s" % (call['to'], call['sid'])
except HTTPErrorAppEngine, e:
app.logger.error(e)
try:
err = json.loads(e.msg)
message = err['Message']
return "REMOTE ERROR: %s" % (message,)
except:
return "Couldn't parse error output:<br>\n%s" % e.msg
开发者ID:singingwolfboy,项目名称:buscalling,代码行数:27,代码来源:twilio.py
示例11: test_it
def test_it(self):
import simplejson
context = testing.DummyModel()
request = testing.DummyRequest(params={"source": "myrecent"})
response = self._call_fut(context, request)
self.assertEqual(response.status, "200 OK")
self.assertEqual(
self.dummy_get_images_batch.called,
(context, request, {"community": None, "batch_start": 0, "batch_size": 12, "creator": None}),
)
data = simplejson.loads(response.body)
self.assertEqual(data["images_info"], {"totalRecords": 5, "start": 0, "records": ["foo", "bar"]})
# ask a batch from the 1st (or nth) image
request = testing.DummyRequest(params={"source": "myrecent", "start": "1"})
response = self._call_fut(context, request)
self.assertEqual(response.status, "200 OK")
self.assertEqual(
self.dummy_get_images_batch.called,
(context, request, {"community": None, "batch_start": 1, "batch_size": 12, "creator": None}),
)
data = simplejson.loads(response.body)
self.assertEqual(
data["images_info"],
{"totalRecords": 5, "start": 1, "records": ["foo", "bar"]}, # no problem... just bad faking
)
开发者ID:reebalazs,项目名称:karl,代码行数:27,代码来源:test_imagedrawer.py
示例12: after_request
def after_request(response):
if flask.request.method == 'OPTIONS':
in_value = response.headers.get('Access-Control-Allow-Headers', '')
allowed = [h.strip() for h in in_value.split(',')]
allowed.append('X-Client-ID')
out_value = ', '.join(allowed)
response.headers['Access-Control-Allow-Headers'] = out_value
return response
if 200 <= response.status_code < 300:
match = re.match(r'^store\.(\w+)_annotation$', flask.request.endpoint)
if match:
request = get_current_request()
action = match.group(1)
if action == 'delete':
data = json.loads(flask.request.data)
else:
data = json.loads(response.data)
annotation = wrap_annotation(data)
event = events.AnnotationEvent(request, annotation, action)
request.registry.notify(event)
return response
开发者ID:mrienstra,项目名称:h,代码行数:25,代码来源:store.py
示例13: test_parse_ad
def test_parse_ad(self):
response = """
{"response":[
{"id":"607256","campaign_id":"123","name":"Ad1","status":0,"approved":0,"all_limit":0,"cost_type":0,"cpm":118},
{"id":"664868","campaign_id":"123","name":"Ad2","status":1,"approved":1,"all_limit":100,"cost_type":1,"cpc":488}
]}
"""
account = AccountFactory(remote_id=1)
campaign = CampaignFactory(account=account, remote_id=1)
instance = Ad(campaign=campaign, fetched=timezone.now())
instance.parse(json.loads(response)["response"][0])
instance.save(commit_remote=False)
self.assertTrue(isinstance(instance.campaign, Campaign))
self.assertEqual(instance.campaign.remote_id, 1)
self.assertEqual(instance.remote_id, 607256)
self.assertEqual(instance.name, "Ad1")
self.assertEqual(instance.status, False)
self.assertEqual(instance.approved, False)
self.assertEqual(instance.all_limit, 0)
self.assertEqual(instance.cpm, 118)
instance = Ad(campaign=campaign, fetched=timezone.now())
instance.parse(json.loads(response)["response"][1])
instance.save(commit_remote=False)
self.assertEqual(instance.remote_id, 664868)
self.assertEqual(instance.name, "Ad2")
self.assertEqual(instance.status, True)
self.assertEqual(instance.approved, True)
self.assertEqual(instance.all_limit, 100)
self.assertEqual(instance.cpc, 488)
开发者ID:ramusus,项目名称:django-vkontakte-ads,代码行数:34,代码来源:tests.py
示例14: test_parse_campaign
def test_parse_campaign(self):
response = """
{"response":[
{"id":"111","name":"Campaign1","status":0,"day_limit":2000,"all_limit":1000000,"start_time":"0","stop_time":"0"},
{"id":"222","name":"Campaign2","status":1,"day_limit":6000,"all_limit":9000000,"start_time":"1298365200","stop_time":"1298451600"}
]}
"""
account = AccountFactory(remote_id=1)
instance = Campaign(account=account, fetched=timezone.now())
instance.parse(json.loads(response)["response"][0])
instance.save(commit_remote=False)
self.assertEqual(instance.remote_id, 111)
self.assertEqual(instance.name, "Campaign1")
self.assertEqual(instance.status, False)
self.assertEqual(instance.day_limit, 2000)
self.assertEqual(instance.all_limit, 1000000)
self.assertEqual(instance.start_time, None)
self.assertEqual(instance.stop_time, None)
instance = Campaign(account=account, fetched=timezone.now())
instance.parse(json.loads(response)["response"][1])
instance.save(commit_remote=False)
self.assertTrue(isinstance(instance.account, Account))
self.assertEqual(instance.account.remote_id, 1)
self.assertEqual(instance.remote_id, 222)
self.assertEqual(instance.name, "Campaign2")
self.assertEqual(instance.status, True)
self.assertEqual(instance.day_limit, 6000)
self.assertEqual(instance.all_limit, 9000000)
self.assertEqual(instance.start_time, datetime(2011, 2, 22, 9, 0, 0, tzinfo=timezone.utc))
self.assertEqual(instance.stop_time, datetime(2011, 2, 23, 9, 0, 0, tzinfo=timezone.utc))
开发者ID:ramusus,项目名称:django-vkontakte-ads,代码行数:35,代码来源:tests.py
示例15: getJSON
def getJSON(self, url, language=None):
language = language or config["language"]
page = requests.get(url, params={"language": language}).content
try:
return simplejson.loads(page)
except:
return simplejson.loads(page.decode("utf-8"))
开发者ID:chenmengyang,项目名称:themoviedb,代码行数:7,代码来源:tmdb.py
示例16: from_request
def from_request(cls, request):
"""Create new TransientShardState from webapp request."""
mapreduce_spec = MapreduceSpec.from_json_str(request.get("mapreduce_spec"))
mapper_spec = mapreduce_spec.mapper
input_reader_spec_dict = simplejson.loads(request.get("input_reader_state"))
input_reader = mapper_spec.input_reader_class().from_json(
input_reader_spec_dict)
output_writer = None
if mapper_spec.output_writer_class():
output_writer = mapper_spec.output_writer_class().from_json(
simplejson.loads(request.get("output_writer_state", "{}")))
assert isinstance(output_writer, mapper_spec.output_writer_class()), (
"%s.from_json returned an instance of wrong class: %s" % (
mapper_spec.output_writer_class(),
output_writer.__class__))
request_path = request.path
base_path = request_path[:request_path.rfind("/")]
return cls(base_path,
mapreduce_spec,
str(request.get("shard_id")),
int(request.get("slice_id")),
input_reader,
output_writer=output_writer)
开发者ID:GeomancerProject,项目名称:Software,代码行数:26,代码来源:model.py
示例17: image
def image(title,id):
titleUrl = "http://de.wikipedia.org/w/api.php?action=query&titles="+title+"&prop=images&format=json"
imgJson = simplejson.loads(urllib.urlopen(titleUrl).read())
imageUrl = None
if imgJson['query']['pages'].has_key(id):
if imgJson['query']['pages'][id].has_key('images'):
if imgJson['query']['pages'][id]['images'][0]:
imageTitle = imgJson['query']['pages'][id]['images'][0]['title']
imageTitle = imageTitle.encode('utf-8')
imageTitleUrl = "http://de.wikipedia.org/w/api.php?action=query&titles="+imageTitle+"&prop=imageinfo&iiprop=url&format=json"
imgTitleJson = simplejson.loads(urllib.urlopen(imageTitleUrl).read())
if imgTitleJson['query']['pages'].has_key("-1"):
if imgTitleJson['query']['pages']['-1'].has_key('imageinfo') :
imageUrl = imgTitleJson['query']['pages']['-1']['imageinfo'][0]['url']
elif imgTitleJson['query']['pages'].has_key('id'):
if imgTitleJson['query']['pages']['id'].has_key('imageinfo'):
imageUrl = imgTitleJson['query']['pages']['id']['imageinfo'][0]['url']
return imageUrl
开发者ID:carriercomm,项目名称:scraperwiki-scraper-vault,代码行数:27,代码来源:baudenkmal_hannover_1.py
示例18: test_load
def test_load(self):
response = self.get('api.load', 'i18n://[email protected]/title')
self.assertEqual(response.status_code, 200)
json_content = json.loads(response.content)
self.assertEqual(json_content['uri'], 'i18n://[email protected]/title.txt')
self.assertIsNone(json_content['data'])
self.assertEqual(len(json_content['meta'].keys()), 0)
# TODO: Should get 404
# response = self.get('api.load', 'i18n://[email protected]/title#1')
# self.assertEqual(response.status_code, 404)
cio.set('i18n://[email protected]/title.md', u'# Djedi')
response = self.get('api.load', '[email protected]/title')
self.assertEqual(response.status_code, 200)
node = json_node(response, simple=False)
meta = node.pop('meta', {})
self.assertDictEqual(node, {'uri': 'i18n://[email protected]/title.md#1', 'data': u'# Djedi', 'content': u'<h1>Djedi</h1>'})
self.assertKeys(meta, 'modified_at', 'published_at', 'is_published')
response = self.get('api.load', 'i18n://[email protected]/title#1')
json_content = json.loads(response.content)
self.assertEqual(json_content['uri'], 'i18n://[email protected]/title.md#1')
self.assertEqual(len(cio.revisions('i18n://[email protected]/title')), 1)
开发者ID:airtonix,项目名称:djedi-cms,代码行数:26,代码来源:test_rest.py
示例19: linkedin_connections
def linkedin_connections():
# Use your credentials to build the oauth client
consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
token = oauth.Token(key=OAUTH_TOKEN, secret=OAUTH_TOKEN_SECRET)
client = oauth.Client(consumer, token)
# Fetch first degree connections
resp, content = client.request('http://api.linkedin.com/v1/people/~/connections?format=json')
results = simplejson.loads(content)
# File that will store the results
output = codecs.open(OUTPUT, 'w', 'utf-8')
# Loop thru the 1st degree connection and see how they connect to each other
for result in results["values"]:
con = "%s %s" % (result["firstName"].replace(",", " "), result["lastName"].replace(",", " "))
# type your own name in below
print >>output, "%s,%s" % ("Your name goes here", con)
# This is the trick, use the search API to get related connections
u = "https://api.linkedin.com/v1/people/%s:(relation-to-viewer:(related-connections))?format=json" % result["id"]
resp, content = client.request(u)
rels = simplejson.loads(content)
try:
for rel in rels['relationToViewer']['relatedConnections']['values']:
sec = "%s %s" % (rel["firstName"].replace(",", " "), rel["lastName"].replace(",", " "))
print >>output, "%s,%s" % (con, sec)
except:
pass
开发者ID:spsanderson,项目名称:linkedin-graph-grab,代码行数:25,代码来源:linkedin-2-query.py
示例20: get_summary
def get_summary(days=30):
# Retrieve json of permits, parse
data = get_data(days=days)
permits = json.loads(data)
cost = 0
offset = 0
link = 'http://chicagocityscape.com/dashboard.php'
# Loop through permits can get more data until we run out of permits
while len(permits) > 0:
for permit in permits:
cost += float(permit['_estimated_cost'])
offset += len(permits)
print(offset)
# Get more data, incrementing call by offset = number of records already parsed
data = get_data(days=days,offset=offset)
permits = json.loads(data)
text = "Over the past " + str(days) + " days there have been " + "{:,.0f}".format(offset) + " permits issued in Chicago, totaling $" + "{:,.0f}".format(cost) + " " + link
print(text)
post_status(text)
开发者ID:chagan,项目名称:permitbot,代码行数:27,代码来源:permitbot.py
注:本文中的simplejson.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论