本文整理汇总了Python中simplejson.json_loads函数的典型用法代码示例。如果您正苦于以下问题:Python json_loads函数的具体用法?Python json_loads怎么用?Python json_loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了json_loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_projects_for_upload
def _get_projects_for_upload(cls, hub_headers, username, rememberme=False):
try:
r = cls.hub_pool.request('POST',
'/dynamic/upload/projects',
headers=hub_headers,
redirect=False)
except (HTTPError, SSLError) as e:
LOG.error(e)
response.status_int = 500
return {'ok': False, 'msg': str(e)}
if r.status != 200:
if r.status == 503:
response.status_int = 503
# pylint: disable=E1103
return {'ok': False, 'msg': json_loads(r.data).get('msg', 'Service currently unavailable.')}
# pylint: enable=E1103
response.status_int = 500
return {'ok': False, 'msg': 'Wrong Hub answer.'}
response.headers['Cache-Control'] = 'no-store, no-cache, max-age=0'
return {
'ok': True,
'cookie': hub_headers.get('Cookie') if rememberme else None,
'user': username,
# pylint: disable=E1103
'projects': json_loads(r.data).get('projects', [])
# pylint: enable=E1103
}
开发者ID:ajbetteridge,项目名称:turbulenz_local,代码行数:32,代码来源:deploy.py
示例2: write_to_file
def write_to_file(options, data, filename=None, output_path=None, force_overwrite=False):
if not filename:
filename = "%s-%s-%s.json" % (options.project, options.type, options.daterange.filename_str())
try:
if not output_path:
output_path = normpath(path_join(options.outputdir, filename))
if path_exists(output_path):
if options.overwrite or force_overwrite:
if not options.silent:
warning("Overwriting existing file: %s" % output_path)
elif not options.silent:
warning("Skipping existing file: %s" % output_path)
return
indentation = None
if options.indent:
indentation = 4
if isinstance(data, str):
data = json_loads(data)
with open(output_path, "wb") as fout:
if isinstance(data, str):
fout.write(data)
else:
json_dump(data, fout, indent=indentation)
if options.verbose:
log("Finished writing to: %s" % output_path)
except (IOError, OSError) as e:
error(e)
exit(-1)
开发者ID:Chiur,项目名称:turbulenz_tools,代码行数:34,代码来源:exportevents.py
示例3: login
def login(connection, options):
username = options.user
password = options.password
if not options.silent:
log('Login as "%s".' % username)
credentials = {"login": username, "password": password, "source": "/tool"}
try:
r = connection.request("POST", "/dynamic/login", fields=credentials, retries=1, redirect=False)
except (HTTPError, SSLError):
error("Connection to Hub failed!")
exit(-1)
if r.status != 200:
if r.status == 301:
redirect_location = r.headers.get("location", "")
end_domain = redirect_location.find("/dynamic/login")
error('Login is being redirected to "%s". Please verify the Hub URL.' % redirect_location[:end_domain])
else:
error("Wrong user login information!")
exit(-1)
cookie = r.headers.get("set-cookie", None)
login_info = json_loads(r.data)
# pylint: disable=E1103
if not cookie or HUB_COOKIE_NAME not in cookie or login_info.get("source") != credentials["source"]:
error("Hub login failed!")
exit(-1)
# pylint: enable=E1103
return cookie
开发者ID:Chiur,项目名称:turbulenz_tools,代码行数:34,代码来源:exportevents.py
示例4: test_search_cell
def test_search_cell(self):
(lat, lon), all_cells = random.choice(self.TOWER_DATA)
expected_lat = int(lat * 1000)
expected_lon = int(lon * 1000)
query_data = {"radio": '', "cell": []}
for cell_data in all_cells:
radio_name = RADIO_MAP[cell_data['radio']]
if query_data['radio'] == '':
query_data['radio'] = radio_name
query_data['cell'].append(dict(radio=radio_name,
cid=cell_data['cid'],
mcc=cell_data['mcc'],
mnc=cell_data['mnc'],
lac=cell_data['lac']))
jdata = json.dumps(query_data)
res = self.session.post(HOST + '/v1/search?key=test', jdata)
self.assertEqual(res.status_code, 200)
jdata = json_loads(res.content)
if jdata['status'] != 'not_found':
actual_lat = int(jdata['lat'] * 1000)
actual_lon = int(jdata['lon'] * 1000)
self.assertEquals(actual_lat, expected_lat)
self.assertEquals(actual_lon, expected_lon)
开发者ID:SmartDelivery,项目名称:ichnaea,代码行数:25,代码来源:loadtest.py
示例5: json_request
def json_request(method, url, **kwargs):
"""Takes a request in json parse it and return in json"""
kwargs.setdefault("headers", {})
if "body" in kwargs:
kwargs["headers"]["Content-Type"] = "application/json"
kwargs["body"] = json_dumps(kwargs["body"])
parsed, conn = http_connection(url)
conn.request(method, parsed.path, **kwargs)
resp = conn.getresponse()
body = resp.read()
http_log((url, method), kwargs, resp, body)
if body:
try:
body = json_loads(body)
except ValueError:
body = None
if not body or resp.status < 200 or resp.status >= 300:
raise ClientException(
"Auth GET failed",
http_scheme=parsed.scheme,
http_host=conn.host,
http_port=conn.port,
http_path=parsed.path,
http_status=resp.status,
http_reason=resp.reason,
)
return resp, body
开发者ID:dani4571,项目名称:python-swiftclient,代码行数:27,代码来源:client.py
示例6: JSONLocationDictDecoder
def JSONLocationDictDecoder(dct):
if '__JSONTupleKeyedDict__' in dct:
tmp = {}
for k, v in dct['dict'].items():
tmp[tuple(json_loads(k))] = v
return tmp
return dct
开发者ID:SmartDelivery,项目名称:ichnaea,代码行数:7,代码来源:loadtest.py
示例7: get_account
def get_account(url, token, marker=None, limit=None, prefix=None,
http_conn=None, full_listing=False):
"""
Get a listing of containers for the account.
:param url: storage URL
:param token: auth token
:param marker: marker query
:param limit: limit query
:param prefix: prefix query
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param full_listing: if True, return a full listing, else returns a max
of 10000 listings
:returns: a tuple of (response headers, a list of containers) The response
headers will be a dict and all header names will be lowercase.
:raises ClientException: HTTP GET request failed
"""
if not http_conn:
http_conn = http_connection(url)
if full_listing:
rv = get_account(url, token, marker, limit, prefix, http_conn)
listing = rv[1]
while listing:
marker = listing[-1]['name']
listing = \
get_account(url, token, marker, limit, prefix, http_conn)[1]
if listing:
rv[1].extend(listing)
return rv
parsed, conn = http_conn
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
full_path = '%s?%s' % (parsed.path, qs)
headers = {'X-Auth-Token': token}
method = 'GET'
conn.request(method, full_path, '', headers)
resp = conn.getresponse()
body = resp.read()
http_log(("%s?%s" % (url, qs), method,), {'headers': headers}, resp, body)
resp_headers = {}
for header, value in resp.getheaders():
resp_headers[header.lower()] = value
if resp.status < 200 or resp.status >= 300:
raise ClientException('Account GET failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port,
http_path=parsed.path, http_query=qs,
http_status=resp.status, http_reason=resp.reason,
http_response_content=body)
if resp.status == 204:
body
return resp_headers, []
return resp_headers, json_loads(body)
开发者ID:meitham,项目名称:python-swiftclient,代码行数:59,代码来源:client.py
示例8: _postupload_progress
def _postupload_progress(deploy_info, connection, cookie, silent, verbose):
if silent:
sleep_step = 1.0
elif verbose:
log('Post processing:')
sleep_step = 0.2
else:
log('Post processing files...')
sleep_step = 0.4
if not deploy_info.hub_session:
error('No deploy session found.')
return -1
old_progress = 0
while True:
sleep(sleep_step)
if deploy_info.error:
error(deploy_info.error)
return -1
try:
r = connection.request('POST',
'/dynamic/upload/progress/%s' % deploy_info.hub_session,
headers={'Cookie': cookie},
redirect=False)
except (HTTPError, SSLError) as e:
error(e)
error('Post-upload progress check failed.')
return -1
if r.status != 200:
error('Wrong Hub answer.')
return -1
r_data = json_loads(r.data)
# pylint: disable=E1103
current_progress = int(r_data.get('progress', -1))
error_msg = str(r_data.get('error', ''))
# pylint: enable=E1103
if error_msg:
error('Post-upload processing failed: %s' % error_msg)
return -1
if -1 == current_progress:
error('Invalid post-upload progress.')
return -1
if verbose and not silent:
if old_progress != current_progress:
log('Progress: %u%%' % current_progress)
old_progress = current_progress
if 100 <= current_progress:
if not silent:
log('Post processing completed.')
return 0
开发者ID:chenbk85,项目名称:turbulenz_local,代码行数:59,代码来源:deploygame.py
示例9: get_account
def get_account(url, token, marker=None, limit=None, prefix=None, http_conn=None, full_listing=False):
"""
Get a listing of containers for the account.
:param url: storage URL
:param token: auth token
:param marker: marker query
:param limit: limit query
:param prefix: prefix query
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param full_listing: if True, return a full listing, else returns a max
of 10000 listings
:returns: a tuple of (response headers, a list of containers) The response
headers will be a dict and all header names will be lowercase.
:raises ClientException: HTTP GET request failed
"""
if not http_conn:
http_conn = http_connection(url)
if full_listing:
rv = get_account(url, token, marker, limit, prefix, http_conn)
listing = rv[1]
while listing:
marker = listing[-1]["name"]
listing = get_account(url, token, marker, limit, prefix, http_conn)[1]
if listing:
rv[1].extend(listing)
return rv
parsed, conn = http_conn
qs = "format=json"
if marker:
qs += "&marker=%s" % quote(marker)
if limit:
qs += "&limit=%d" % limit
if prefix:
qs += "&prefix=%s" % quote(prefix)
conn.request("GET", "%s?%s" % (parsed.path, qs), "", {"X-Auth-Token": token})
resp = conn.getresponse()
resp_headers = {}
for header, value in resp.getheaders():
resp_headers[header.lower()] = value
if resp.status < 200 or resp.status >= 300:
resp.read()
raise ClientException(
"Account GET failed",
http_scheme=parsed.scheme,
http_host=conn.host,
http_port=conn.port,
http_path=parsed.path,
http_query=qs,
http_status=resp.status,
http_reason=resp.reason,
)
if resp.status == 204:
resp.read()
return resp_headers, []
return resp_headers, json_loads(resp.read())
开发者ID:edwardt,项目名称:swift,代码行数:57,代码来源:client.py
示例10: _get_config_data
def _get_config_data(self, inConfigPath):
"""
Given a file path to a JSON config file, open and
convert to a python object.
"""
with open(inConfigPath) as config_file:
config_string = config_file.read()
config_data = json_loads(config_string)
return config_data
开发者ID:mattmakesmaps,项目名称:DataPlunger,代码行数:9,代码来源:config_iter_example.py
示例11: __get_config_data
def __get_config_data(self, inConfigPath):
"""
Given a pathway, return a python object
built from a JSON object.
"""
with open(inConfigPath) as config_file:
config_string = config_file.read()
config_data = json_loads(config_string)
return config_data
开发者ID:mattmakesmaps,项目名称:DataPlunger,代码行数:9,代码来源:parse_config.py
示例12: loads
def loads(s, *args, **kwargs):
"""Helper to log actual value which failed to be parsed"""
try:
return json_loads(s, *args, **kwargs)
except:
lgr.error(
"Failed to load content from %r with args=%r kwargs=%r"
% (s, args, kwargs)
)
raise
开发者ID:datalad,项目名称:datalad,代码行数:10,代码来源:json_py.py
示例13: inline_array_events_s3
def inline_array_events_s3(options, today_log, array_files_list, enc_key, connection):
verbose = options.verbose
to_sort = set()
try:
for index, filename in enumerate(array_files_list):
# Format: 'https://bucket.s3.amazonaws.com/gamefolder/arrayevents/date(seconds)/objectid.bin?
# AWSAccessKeyId=keyid&Expires=timestamp&Signature=signature'
# The objectid doesn't correspond to a database entry but it used for uniqueness and timestamp
filename_cleaned = filename.split("?", 1)[0].rsplit("/", 1)[-1]
event_objectid = filename_cleaned.split(".", 1)[0]
timestamp = get_objectid_timestamp(event_objectid)
formatted_timestamp = strftime("%Y-%m-%d %H:%M:%S", gmtime(timestamp))
if verbose:
log("Requesting events file " + str(index + 1) + " submitted at " + formatted_timestamp)
r = connection.request("GET", filename, redirect=False)
# pylint: disable=E1103
if r.status != 200:
error_msg = "Couldn't download event %d." % (index + 1)
if r.data.get("msg", None):
error_msg += " " + r.data["msg"]
error(str(r.status) + error_msg)
exit(-1)
# pylint: enable=E1103
r_data = decrypt_data(r.data, enc_key)
r_data = json_loads(zlib_decompress(r_data))
if not isinstance(r_data, list):
r_data = [r_data]
for event in r_data:
slug = event["slug"]
del event["slug"]
event["time"] = strftime("%Y-%m-%d %H:%M:%S", gmtime(event["time"]))
if slug not in today_log:
today_log[slug] = {"playEvents": [], "customEvents": []}
today_log[slug]["customEvents"].append(event)
# Maintaining a list of slugs to sort the customEvents by date for so that added array events appear in
# order but we do not unneccesarily sort large lists if an array event wasn't added to it
to_sort.add(slug)
for slug in to_sort:
today_log[slug]["customEvents"].sort(key=lambda k: k["time"])
return today_log
except (HTTPError, SSLError) as e:
error(e)
exit(-1)
开发者ID:Chiur,项目名称:turbulenz_tools,代码行数:55,代码来源:exportevents.py
示例14: load_pack_info
def load_pack_info(filename, scon=None, file=None):
if not file:
f = scon.get_object(filename)
else:
f = file
if not f:
return None
try:
return json_loads(zlib.decompress(f.read()))
finally:
f.close()
开发者ID:PKRoma,项目名称:dulwich,代码行数:11,代码来源:swift.py
示例15: get_configuration
def get_configuration(self, user):
"""
Should be replaced by intelligent proxy object.
"""
try:
configuration = json_loads(user.configuration, parse_float=Decimal)
except:
configuration = {}
if not isinstance(configuration, dict):
configuration = {}
return configuration
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:11,代码来源:ops.py
示例16: batch_check_files
def batch_check_files(self, files, checked_queue_put):
urlopen = self.hub_pool.urlopen
base_url = self._base_check_url
url_format = self._check_url_format
get_upload_token = _get_upload_file_token
timeout = self.hub_timeout
if self._batch_checks:
query = '&'.join((url_format % (get_upload_token(i, f[1]), f[3], f[2])) for i, f in enumerate(files))
r = urlopen('GET',
base_url + query,
redirect=False,
assert_same_host=False,
timeout=timeout)
if r.status == 200:
# pylint: disable=E1103
missing_files = set(json_loads(r.data).get('missing', []))
# pylint: enable=E1103
for i, f in enumerate(files):
if get_upload_token(i, f[1]) in missing_files:
# Update meta data cache and upload
checked_queue_put(f)
else:
# Only needs to update meta data cache
checked_queue_put((f[1], f[2], f[3], f[4], f[5]))
return
else:
f = files.pop(0)
if r.status == 304:
# First one only needs to update meta data cache
checked_queue_put((f[1], f[2], f[3], f[4], f[5]))
elif r.status == 404:
# First one needs to update meta data cache and to upload
checked_queue_put(f)
else:
raise Exception(r.reason)
if len(files) == 1:
return
# Legacy format, check one by one...
self._batch_checks = False
r = None
for f in files:
query = url_format % (basename(f[1]), f[3], f[2])
if urlopen('GET',
base_url + query,
redirect=False,
assert_same_host=False,
timeout=timeout).status == 304:
# Only needs to update meta data cache
checked_queue_put((f[1], f[2], f[3], f[4], f[5]))
else:
# Update meta data cache and upload
checked_queue_put(f)
开发者ID:calimonk,项目名称:turbulenz_local,代码行数:54,代码来源:deploy.py
示例17: _get_config_data
def _get_config_data(self, path):
"""
Given a file path to a JSON config file, open and
convert to a python object.
:param str path: full pathway to a JSON-formatted config file.
"""
with open(path) as config_file:
config_string = config_file.read()
config_data = json_loads(config_string)
return config_data
开发者ID:mattmakesmaps,项目名称:DataPlunger,代码行数:11,代码来源:core.py
示例18: inline_array_events_local
def inline_array_events_local(options, today_log, array_files_list, enc_key):
verbose = options.verbose
to_sort = set()
try:
index = 0
for index, filename in enumerate(array_files_list):
# Format: 'eventlogspath/gamefolder/arrayevents/date(seconds)/objectid.bin'
# The objectid doesn't correspond to a database entry but is used for uniqueness and timestamp
filename = filename.replace('\\', '/')
event_objectid = filename.rsplit('/', 1)[-1].split('.', 1)[0]
timestamp = get_objectid_timestamp(event_objectid)
formatted_timestamp = strftime('%Y-%m-%d %H:%M:%S', gmtime(timestamp))
if verbose:
log('Retrieving array events file ' + str(index + 1) + ' submitted at ' + formatted_timestamp)
with open(filename, 'rb') as fin:
file_content = fin.read()
file_content = decrypt_data(file_content, enc_key)
file_content = json_loads(zlib_decompress(file_content))
if not isinstance(file_content, list):
file_content = [file_content]
for event in file_content:
slug = event['slug']
del event['slug']
# Some older files had no timestamp in the file data itself in which case we use the timestamp
# on the filename
if 'time' in event:
event['time'] = strftime('%Y-%m-%d %H:%M:%S', gmtime(event['time']))
else:
event['time'] = formatted_timestamp
if slug not in today_log:
today_log[slug] = { 'playEvents': [], 'customEvents': [] }
today_log[slug]['customEvents'].append(event)
# Maintaining a list of slugs to sort the customEvents by date for so that added array events appear in
# order but we do not unneccesarily sort large lists if an array event wasn't added to it
to_sort.add(slug)
for slug in to_sort:
today_log[slug]['customEvents'].sort(key=lambda k: k['time'])
return today_log
except (IOError, OSError) as e:
error(e)
exit(-1)
开发者ID:ajbetteridge,项目名称:turbulenz_tools,代码行数:52,代码来源:exportevents.py
示例19: login
def login(cls):
"""
Start deploying the game.
"""
response.headers['Cache-Control'] = 'no-store, no-cache, max-age=0'
hub_pool = connection_from_url(cls.base_url, maxsize=8, timeout=8.0)
if not hub_pool or not cls.cookie_name:
response.status_int = 500
return {'ok': False, 'msg': 'Wrong deployment configuration.'}
cls.hub_pool = hub_pool
form = request.params
try:
login_name = form['login']
credentials = {
'login': login_name,
'password': form['password'],
'source': '/local'
}
except KeyError:
response.status_int = 400
return {'ok': False, 'msg': 'Missing user login information.'}
try:
r = hub_pool.request('POST',
'/dynamic/login',
fields=credentials,
retries=1,
redirect=False)
except (HTTPError, SSLError) as e:
LOG.error(e)
response.status_int = 500
return {'ok': False, 'msg': str(e)}
if r.status != 200:
response.status_int = 400
return {'ok': False, 'msg': 'Wrong user login information.'}
cookie = r.headers.get('set-cookie', None)
login_info = json_loads(r.data)
# pylint: disable=E1103
if not cookie or cls.cookie_name not in cookie or login_info.get('source') != credentials['source']:
response.status_int = 400
return {'ok': False, 'msg': 'Wrong user login information.'}
# pylint: enable=E1103
hub_headers = {'Cookie': cookie}
return cls._get_projects_for_upload(hub_headers, login_name, form.get('rememberme'))
开发者ID:ajbetteridge,项目名称:turbulenz_local,代码行数:52,代码来源:deploy.py
示例20: _check_project
def _check_project(connection, options, cookie):
project = options.project
projectversion = options.projectversion
projectversion_title = options.projectversiontitle
try:
r = connection.request('POST',
'/dynamic/upload/projects',
headers={'Cookie': cookie},
redirect=False)
except (HTTPError, SSLError) as e:
error(e)
exit(-1)
if r.status != 200:
error('Wrong Hub answer!')
exit(-1)
# pylint: disable=E1103
projects = json_loads(r.data).get('projects', [])
# pylint: enable=E1103
upload_access = False
new_version = True
for project_info in projects:
if project_info['slug'] == project:
upload_access = True
for version_info in project_info['versions']:
if version_info['version'] == projectversion:
new_version = False
# Use the supplied project version title or the existing one as a fallback
existingversion_title = version_info['title']
projectversion_title = projectversion_title or existingversion_title
break
# If projectversion_title is still unset this is a new version with no supplied title, default to the version
projectversion_title = projectversion_title or projectversion
if not upload_access:
error('Project "%s" does not exist or you are not authorized to upload new versions!' % project)
exit(-1)
if not options.silent:
if new_version:
log('Uploading to new version "%s" on project "%s".' % (projectversion, project))
else:
log('Uploading to existing version "%s" on project "%s".' % (projectversion, project))
if projectversion_title != existingversion_title:
log('Changing project version title from "%s" to "%s".' % (existingversion_title,
projectversion_title))
return (project, projectversion, projectversion_title)
开发者ID:DaneTheory,项目名称:turbulenz_local,代码行数:52,代码来源:deploygame.py
注:本文中的simplejson.json_loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论