本文整理汇总了Python中simplejson.json_dumps函数的典型用法代码示例。如果您正苦于以下问题:Python json_dumps函数的具体用法?Python json_dumps怎么用?Python json_dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了json_dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _json_handler
def _json_handler(obj):
if isinstance(obj, datetime):
return 'new Date("%s")' % obj.ctime()
if isinstance(obj, Model):
return dumps(dict(obj))
if isinstance(obj, ObjectId):
return json_dumps(str(obj))
return json_dumps(obj)
开发者ID:jmoiron,项目名称:jmoiron.net,代码行数:8,代码来源:utils.py
示例2: json_request
def json_request(method, url, **kwargs):
"""Takes a request in json parse it and return in json"""
kwargs.setdefault("headers", {})
if "body" in kwargs:
kwargs["headers"]["Content-Type"] = "application/json"
kwargs["body"] = json_dumps(kwargs["body"])
parsed, conn = http_connection(url)
conn.request(method, parsed.path, **kwargs)
resp = conn.getresponse()
body = resp.read()
http_log((url, method), kwargs, resp, body)
if body:
try:
body = json_loads(body)
except ValueError:
body = None
if not body or resp.status < 200 or resp.status >= 300:
raise ClientException(
"Auth GET failed",
http_scheme=parsed.scheme,
http_host=conn.host,
http_port=conn.port,
http_path=parsed.path,
http_status=resp.status,
http_reason=resp.reason,
)
return resp, body
开发者ID:dani4571,项目名称:python-swiftclient,代码行数:27,代码来源:client.py
示例3: elem2json
def elem2json(elem, strip=True, indent=0, convert_types=False):
"""Convert an ElementTree or Element into a JSON string."""
if hasattr(elem, 'getroot'):
elem = elem.getroot()
internal = elem2internal(elem, strip=strip, convert_types=convert_types)
# Module 'simplejson' has no 'encoder' member
# pylint: disable=E1101
json_encoder.FLOAT_REPR = float_to_string
# pylint: enable=E1101
if indent > 0:
output = json_dumps(internal, sort_keys=True, indent=indent)
else:
output = json_dumps(internal, sort_keys=True, separators=(',', ':'))
return output
开发者ID:Chiur,项目名称:turbulenz_tools,代码行数:17,代码来源:xml_json.py
示例4: api_get_disk_info
def api_get_disk_info(disk='/'):
if not disk.startswith('/'):
disk = '/' + disk
ctx = api.get_disk_info(disk=disk)
return Response(response=json_dumps(ctx), mimetype="application/json")
开发者ID:MarcDufresne,项目名称:SysWatch,代码行数:8,代码来源:ServerMonitorClient.py
示例5: api_get_all_disks_info
def api_get_all_disks_info():
disks_data = {}
for disk in DISKS:
data = api.get_disk_info(disk)
if data:
disks_data[disk] = data
return Response(response=json_dumps(disks_data), mimetype="application/json")
开发者ID:MarcDufresne,项目名称:SysWatch,代码行数:8,代码来源:ServerMonitorClient.py
示例6: tweets_to_json
def tweets_to_json(tweets, tweetfile, append=False, pretty=False):
"""
Exports a collection of tweets (any iterable of tweet objects) to given
file in JSON, line-separated format.
To append to given filename, pass append=True
To print in pretty (line- and entity-separated) format, pass pretty=True
"""
if append:
handle = open(tweetfile, "a")
else:
handle = open(tweetfile, "w")
if pretty:
for tweet in tweets:
handle.write(json_dumps(tweet, indent=4, separators=(',', ': ')) + "\n")
else:
for tweet in tweets:
handle.write(json_dumps(tweet) + "\n")
handle.close()
开发者ID:IWhisper,项目名称:smappPy,代码行数:18,代码来源:store_tweets.py
示例7: set_configuration
def set_configuration(self, user, configuration):
"""
Should be replaced by intelligent proxy object.
"""
try:
user.configuration = json_dumps(configuration)
self.db.flush()
except:
pass
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:9,代码来源:ops.py
示例8: fake_auth_request_v2
def fake_auth_request_v2(*args, **kwargs):
s_url = "http://127.0.0.1:8080/v1.0/AUTH_fakeuser"
resp = {
"access": {
"token": {"id": "12" * 10},
"serviceCatalog": [{"type": "object-store", "endpoints": [{"region": "test", "internalURL": s_url}]}],
}
}
ret = Response(status=200, content=json_dumps(resp))
return ret
开发者ID:coderedfox,项目名称:vfxpipe,代码行数:10,代码来源:test_swift.py
示例9: write_json
def write_json(self, value, seconds):
if not isinstance(value, (basestring, )):
value = json_dumps(value, indent=4)
self.response.headers['Content-Type'] = 'application/x-javascript'
self.response.headers['Expires'] = (datetime.now() + timedelta(hours=1)).ctime()
self.response.headers['Cache-Control'] = 'max-age=' + str(seconds)
cb = parse_qs(self.request.query_string).get('callback', (None, ))[0]
if cb:
value = '%s(%s)' % (cb, value)
self.response.out.write(value)
开发者ID:natural,项目名称:tf2-api-proxy,代码行数:10,代码来源:lib.py
示例10: api_get_filtered_processes
def api_get_filtered_processes(process_filter=None):
if not process_filter:
process_filter = PROCESSES if PROCESSES else ''
process_filter = process_filter.split(',')
ctx = api.get_filtered_processes(process_filter)
return Response(response=json_dumps(ctx), mimetype="application/json")
开发者ID:MarcDufresne,项目名称:SysWatch,代码行数:10,代码来源:ServerMonitorClient.py
示例11: do_request
def do_request(cls, endpoint, data):
try:
r = requests.post(cls.YGGDRASIL_BASE + endpoint, data=json_dumps(data))
if not r.ok:
try:
error = r.json()['errorMessage']
except:
error = "unknown error"
raise SessionException("%d: %s" % (r.status_code, error))
json = r.json()
return json
except Exception as e:
print str(e)
开发者ID:2regalex,项目名称:Gravy,代码行数:13,代码来源:gravylib.py
示例12: get_auth
def get_auth(url, user, key, region, snet=False):
"""
Get authentication/authorization credentials.
The snet parameter is used for Rackspace's ServiceNet internal network
implementation. In this function, it simply adds *snet-* to the beginning
of the host name for the returned storage URL. With Rackspace Cloud Files,
use of this network path causes no bandwidth charges but requires the
client to be running on Rackspace's ServiceNet network.
:param url: authentication/authorization URL
:param user: user to authenticate as
:param key: key or password for authorization
:param region: service region [dfw, ord, syd, iad, etc]
:param snet: use SERVICENET internal network (see above), default is False
:returns: tuple of (storage URL, auth token)
:raises ClientException: HTTP GET request to auth URL failed
"""
swift_service = 'object-store'
parsed, conn = http_connection(url)
params = json_dumps({"auth": {"RAX-KSKEY:apiKeyCredentials":
{"username": user, "apiKey": key}}})
conn.request('POST', parsed.path, params,
{'Accept': 'application/json',
'Content-Type': 'application/json'})
resp = conn.getresponse()
data = json_loads(resp.read())
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Auth POST failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port,
http_path=parsed.path, http_status=resp.status,
http_reason=resp.reason)
try:
token = data['access']['token']['id']
for service in data['access']['serviceCatalog']:
if service['type'] == swift_service:
for points in service['endpoints']:
if points['region'] == region:
if snet:
storage_url = points['internalURL']
else:
storage_url = points['publicURL']
return storage_url, token
raise ClientException('Region %s not found' % region)
raise ClientException('Service Type %s not found' % swift_service)
except KeyError:
raise ClientException(
'Inconsistent Service Catalog back from auth: %s' % data)
开发者ID:audip,项目名称:lunr,代码行数:50,代码来源:swift.py
示例13: do_request
def do_request(cls, endpoint, data):
try:
log.debug("sending %s" % (data,))
r = requests.post(cls.YGGDRASIL_BASE + endpoint, data=json_dumps(data))
if not r.ok:
try:
error = r.json()['errorMessage']
except:
error = "unknown error"
raise SessionException("%d: %s" % (r.status_code, error))
json = r.json()
log.debug("received %s" % (json,))
return json
except requests.exceptions.RequestException, err:
raise SessionException(err.message)
开发者ID:TTGhost,项目名称:ApplePi,代码行数:15,代码来源:auth.py
示例14: fake_auth_request_v2
def fake_auth_request_v2(*args, **kwargs):
s_url = 'http://127.0.0.1:8080/v1.0/AUTH_fakeuser'
resp = {'access': {'token': {'id': '12' * 10},
'serviceCatalog':
[
{'type': 'object-store',
'endpoints': [{'region': 'test',
'internalURL': s_url,
},
]
},
]
}
}
ret = Response(status=200, content=json_dumps(resp))
return ret
开发者ID:EvanKrall,项目名称:dulwich,代码行数:16,代码来源:test_swift.py
示例15: pack_info_create
def pack_info_create(pack_data, pack_index):
pack = Pack.from_objects(pack_data, pack_index)
info = {}
for obj in pack.iterobjects():
# Commit
if obj.type_num == Commit.type_num:
info[obj.id] = (obj.type_num, obj.parents, obj.tree)
# Tree
elif obj.type_num == Tree.type_num:
shas = [(s, n, not stat.S_ISDIR(m)) for
n, m, s in obj.iteritems() if not S_ISGITLINK(m)]
info[obj.id] = (obj.type_num, shas)
# Blob
elif obj.type_num == Blob.type_num:
info[obj.id] = None
# Tag
elif obj.type_num == Tag.type_num:
info[obj.id] = (obj.type_num, obj.object[1])
return zlib.compress(json_dumps(info))
开发者ID:PKRoma,项目名称:dulwich,代码行数:19,代码来源:swift.py
示例16: swift_auth_v2
def swift_auth_v2(self):
self.tenant, self.user = self.user.split(';')
auth_dict = {}
auth_dict['auth'] = {'passwordCredentials':
{
'username': self.user,
'password': self.password,
},
'tenantName': self.tenant}
auth_json = json_dumps(auth_dict)
headers = {'Content-Type': 'application/json'}
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
path = urlparse(self.auth_url).path
if not path.endswith('tokens'):
path = posixpath.join(path, 'tokens')
ret = auth_httpclient.request('POST', path,
body=auth_json,
headers=headers)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v2.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
str(ret.items())))
auth_ret_json = json_loads(ret.read())
token = auth_ret_json['access']['token']['id']
catalogs = auth_ret_json['access']['serviceCatalog']
object_store = [o_store for o_store in catalogs if
o_store['type'] == 'object-store'][0]
endpoints = object_store['endpoints']
endpoint = [endp for endp in endpoints if
endp["region"] == self.region_name][0]
return endpoint[self.endpoint_type], token
开发者ID:PKRoma,项目名称:dulwich,代码行数:38,代码来源:swift.py
示例17: json_request
def json_request(method, url, **kwargs):
"""Takes a request in json parse it and return in json"""
kwargs.setdefault('headers', {})
if 'body' in kwargs:
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['body'] = json_dumps(kwargs['body'])
parsed, conn = http_connection(url)
conn.request(method, parsed.path, **kwargs)
resp = conn.getresponse()
body = resp.read()
if body:
try:
body = json_loads(body)
except ValueError:
body = None
if not body or resp.status < 200 or resp.status >= 300:
raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
http_host=conn.host,
http_port=conn.port,
http_path=parsed.path,
http_status=resp.status,
http_reason=resp.reason)
return resp, body
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:23,代码来源:client.py
示例18: join_server
def join_server(session, server_hash):
r = requests.post('https://sessionserver.mojang.com/session/minecraft/join', data=json_dumps({
'accessToken': session.access_token,
'selectedProfile': session.uuid_hex,
'serverId': server_hash,
}), headers = {
'Content-Type': 'application/json', #; charset=utf-8',
'User-Agent': None,
})
return r.status_code in (200, 204)
开发者ID:2regalex,项目名称:Gravy,代码行数:10,代码来源:gravylib.py
示例19: api_get_uptime
def api_get_uptime():
ctx = api.get_uptime()
return Response(response=json_dumps(ctx), mimetype="application/json")
开发者ID:MarcDufresne,项目名称:SysWatch,代码行数:5,代码来源:ServerMonitorClient.py
示例20: test_get_container_objects
def test_get_container_objects(self):
with patch('geventhttpclient.HTTPClient.request',
lambda *args: Response(content=json_dumps(
(({'name': 'a'}, {'name': 'b'}))))):
self.assertEqual(len(self.conn.get_container_objects()), 2)
开发者ID:EvanKrall,项目名称:dulwich,代码行数:5,代码来源:test_swift.py
注:本文中的simplejson.json_dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论