本文整理汇总了Python中zstacklib.utils.jsonobject.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: async_call_wait_for_complete
def async_call_wait_for_complete(self, apicmd, exception_on_error=True, interval=500, fail_soon=False):
self._check_not_none_field(apicmd)
timeout = apicmd.timeout
if not timeout:
timeout = 1800000
cmd = {apicmd.FULL_NAME: apicmd}
logger.debug("async call[url: %s, request: %s]" % (self.api_url, jsonobject.dumps(cmd)))
jstr = http.json_dump_post(self.api_url, cmd, fail_soon=fail_soon)
rsp = jsonobject.loads(jstr)
if rsp.state == 'Done':
logger.debug("async call[url: %s, response: %s]" % (self.api_url, rsp.result))
reply = jsonobject.loads(rsp.result)
(name, event) = (reply.__dict__.items()[0])
if exception_on_error and not event.success:
raise ApiError('API call[%s] failed because %s' % (name, self._error_code_to_string(event.error)))
return name, event
curr = 0
finterval = float(float(interval) / float(1000))
ret_uuid = rsp.uuid
while rsp.state != 'Done' and curr < timeout:
time.sleep(finterval)
rsp = self._get_response(ret_uuid)
curr += interval
if curr >= timeout:
raise ApiError('API call[%s] timeout after %dms' % (apicmd.FULL_NAME, curr))
logger.debug("async call[url: %s, response: %s] after %dms" % (self.api_url, rsp.result, curr))
reply = jsonobject.loads(rsp.result)
(name, event) = (reply.__dict__.items()[0])
if exception_on_error and not event.success:
raise ApiError('API call[%s] failed because %s' % (name, self._error_code_to_string(event.error)))
return name, event
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:34,代码来源:api.py
示例2: get_images_metadata
def get_images_metadata(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
valid_images_info = ""
bs_sftp_info_file = cmd.backupStoragePath + '/' + self.SFTP_METADATA_FILE
image_uuid_list = []
with open(bs_sftp_info_file) as fd:
images_info = fd.read()
for image_info in images_info.split('\n'):
if image_info != '':
image_json = jsonobject.loads(image_info)
# todo support multiple bs
image_uuid = image_json['uuid']
image_install_path = image_json["backupStorageRefs"][0]["installPath"]
if image_uuid in image_uuid_list:
logger.debug("duplicate uuid %s, ignore" % image_json["uuid"])
continue
image_uuid_list.append(image_uuid)
ret = bash_r("ls %s" % image_install_path)
if ret == 0 :
logger.info("Check image %s install path %s successfully!" % (image_uuid, image_install_path))
valid_images_info = image_info + '\n' + valid_images_info
else:
logger.warn("Image %s install path %s is invalid!" % (image_uuid, image_install_path))
rsp = GetImageMetaDataResponse()
rsp.imagesMetaData = valid_images_info
return jsonobject.dumps(rsp)
开发者ID:ShaofeiWang,项目名称:zstack-utility,代码行数:27,代码来源:sftpbackupstorage.py
示例3: ping
def ping(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
rsp = PingRsp()
facts = bash_o('ceph -s -f json')
mon_facts = jsonobject.loads(facts)
found = False
for mon in mon_facts.monmap.mons:
if cmd.monAddr in mon.addr:
found = True
break
if not found:
rsp.success = False
rsp.failure = "MonAddrChanged"
rsp.error = 'The mon addr is changed on the mon server[uuid:%s], not %s anymore.' \
'Reconnect the ceph primary storage' \
' may solve this issue' % (cmd.monUuid, cmd.monAddr)
return jsonobject.dumps(rsp)
create_img = shell.ShellCmd('rbd create %s --image-format 2 --size 1' % cmd.testImagePath)
create_img(False)
if create_img.return_code != 0:
rsp.success = False
rsp.failure = 'UnableToCreateFile'
rsp.error = "%s %s" % (create_img.stderr, create_img.stdout)
else:
rm_img = shell.ShellCmd('rbd rm %s' % cmd.testImagePath)
rm_img(False)
return jsonobject.dumps(rsp)
开发者ID:ShaofeiWang,项目名称:zstack-utility,代码行数:32,代码来源:cephagent.py
示例4: get_images_metadata
def get_images_metadata(self, req):
logger.debug("meilei: get images metadata")
cmd = jsonobject.loads(req[http.REQUEST_BODY])
pool_name = cmd.poolName
bs_uuid = pool_name.split("-")[-1]
valid_images_info = ""
self.get_metadata_file(bs_uuid, self.CEPH_METADATA_FILE)
last_image_install_path = ""
bs_ceph_info_file = "/tmp/%s" % self.CEPH_METADATA_FILE
with open(bs_ceph_info_file) as fd:
images_info = fd.read()
for image_info in images_info.split('\n'):
if image_info != '':
image_json = jsonobject.loads(image_info)
# todo support multiple bs
image_uuid = image_json['uuid']
image_install_path = image_json["backupStorageRefs"][0]["installPath"]
ret = bash_r("rbd info %s" % image_install_path.split("//")[1])
if ret == 0 :
logger.info("Check image %s install path %s successfully!" % (image_uuid, image_install_path))
if image_install_path != last_image_install_path:
valid_images_info = image_info + '\n' + valid_images_info
last_image_install_path = image_install_path
else:
logger.warn("Image %s install path %s is invalid!" % (image_uuid, image_install_path))
self.put_metadata_file(bs_uuid, self.CEPH_METADATA_FILE)
rsp = GetImageMetaDataResponse()
rsp.imagesMetadata= valid_images_info
return jsonobject.dumps(rsp)
开发者ID:ShaofeiWang,项目名称:zstack-utility,代码行数:30,代码来源:cephagent.py
示例5: ping
def ping(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
rsp = PingRsp()
facts = bash_o('ceph -s -f json')
mon_facts = jsonobject.loads(facts)
found = False
for mon in mon_facts.monmap.mons:
if cmd.monAddr in mon.addr:
found = True
break
if not found:
rsp.success = False
rsp.failure = "MonAddrChanged"
rsp.error = 'The mon addr is changed on the mon server[uuid:%s], not %s anymore.' \
'Reconnect the ceph primary storage' \
' may solve this issue' % (cmd.monUuid, cmd.monAddr)
return jsonobject.dumps(rsp)
pool, objname = cmd.testImagePath.split('/')
create_img = shell.ShellCmd("echo zstack | rados -p '%s' put '%s' -" % (pool, objname))
create_img(False)
if create_img.return_code != 0:
rsp.success = False
rsp.failure = 'UnableToCreateFile'
rsp.error = "%s %s" % (create_img.stderr, create_img.stdout)
else:
shell.run("rados -p '%s' rm '%s'" % (pool, objname))
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:32,代码来源:cephagent.py
示例6: init
def init(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
o = shell.call('ceph mon_status')
mon_status = jsonobject.loads(o)
fsid = mon_status.monmap.fsid_
existing_pools = shell.call('ceph osd lspools')
for pool in cmd.pools:
if pool.predefined and pool.name not in existing_pools:
raise Exception('cannot find pool[%s] in the ceph cluster, you must create it manually' % pool.name)
elif pool.name not in existing_pools:
shell.call('ceph osd pool create %s 128' % pool.name)
rsp = InitRsp()
if cmd.nocephx is False:
o = shell.call("ceph -f json auth get-or-create client.zstack mon 'allow r' osd 'allow *' 2>/dev/null").strip(
' \n\r\t')
o = jsonobject.loads(o)
rsp.userKey = o[0].key_
rsp.fsid = fsid
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:26,代码来源:cephagent.py
示例7: create_snapshot
def create_snapshot(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
spath = self._normalize_install_path(cmd.snapshotPath)
do_create = True
if cmd.skipOnExisting:
image_name, sp_name = spath.split('@')
o = shell.call('rbd --format json snap ls %s' % image_name)
o = jsonobject.loads(o)
for s in o:
if s.name_ == sp_name:
do_create = False
if do_create:
o = shell.ShellCmd('rbd snap create %s' % spath)
o(False)
if o.return_code != 0:
shell.run("rbd snap rm %s" % spath)
o.raise_error()
rsp = CreateSnapshotRsp()
rsp.size = self._get_file_size(spath)
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:25,代码来源:cephagent.py
示例8: get_volume_snapinfos
def get_volume_snapinfos(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
vpath = self._normalize_install_path(cmd.volumePath)
ret = shell.call('rbd --format=json snap ls %s' % vpath)
rsp = GetVolumeSnapInfosRsp()
rsp.snapInfos = jsonobject.loads(ret)
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:8,代码来源:cephagent.py
示例9: get_facts
def get_facts(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
o = shell.call('ceph mon_status')
mon_status = jsonobject.loads(o)
fsid = mon_status.monmap.fsid_
rsp = GetFactsRsp()
rsp.fsid = fsid
return jsonobject.dumps(rsp)
开发者ID:quarkonics,项目名称:zstack-utility,代码行数:9,代码来源:cephagent.py
示例10: ping
def ping(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
facts = bash_o('ceph -s -f json')
mon_facts = jsonobject.loads(facts)
found = False
for mon in mon_facts.monmap.mons:
if cmd.monAddr in mon.addr:
found = True
break
rsp = PingRsp()
if not found:
rsp.success = False
rsp.failure = "MonAddrChanged"
rsp.error = 'The mon addr is changed on the mon server[uuid:%s], not %s anymore.' \
'Reconnect the ceph primary storage' \
' may solve this issue' % (cmd.monUuid, cmd.monAddr)
return jsonobject.dumps(rsp)
def retry(times=3, sleep_time=3):
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
for i in range(0, times):
try:
return f(*args, **kwargs)
except Exception as e:
logger.error(e)
time.sleep(sleep_time)
rsp.error = ("Still failed after retry. Below is detail:\n %s" % e)
return inner
return wrap
@retry()
def doPing():
# try to delete test file, ignore the result
pool, objname = cmd.testImagePath.split('/')
bash_r("rados -p '%s' rm '%s'" % (pool, objname))
r, o, e = bash_roe("echo zstack | timeout 60 rados -p '%s' put '%s' -" % (pool, objname))
if r != 0:
rsp.success = False
rsp.failure = "UnableToCreateFile"
if r == 124:
# timeout happened
rsp.error = 'failed to create heartbeat object on ceph, timeout after 60s, %s %s' % (e, o)
raise Exception(rsp.error)
else:
rsp.error = "%s %s" % (e, o)
doPing()
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:57,代码来源:cephagent.py
示例11: download
def download(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
pool, image_name = self._parse_install_path(cmd.installPath)
tmp_image_name = 'tmp-%s' % image_name
if cmd.url.startswith('http://') or cmd.url.startswith('https://'):
shell.call('set -o pipefail; wget --no-check-certificate -q -O - %s | rbd import --image-format 2 - %s/%s'
% (cmd.url, pool, tmp_image_name))
actual_size = linux.get_file_size_by_http_head(cmd.url)
elif cmd.url.startswith('file://'):
src_path = cmd.url.lstrip('file:')
src_path = os.path.normpath(src_path)
if not os.path.isfile(src_path):
raise Exception('cannot find the file[%s]' % src_path)
shell.call("rbd import --image-format 2 %s %s/%s" % (src_path, pool, tmp_image_name))
actual_size = os.path.getsize(src_path)
else:
raise Exception('unknown url[%s]' % cmd.url)
@rollbackable
def _1():
shell.call('rbd rm %s/%s' % (pool, tmp_image_name))
_1()
file_format = shell.call("set -o pipefail; qemu-img info rbd:%s/%s | grep 'file format' | cut -d ':' -f 2" % (pool, tmp_image_name))
file_format = file_format.strip()
if file_format not in ['qcow2', 'raw']:
raise Exception('unknown image format: %s' % file_format)
if file_format == 'qcow2':
conf_path = None
try:
with open('/etc/ceph/ceph.conf', 'r') as fd:
conf = fd.read()
conf = '%s\n%s\n' % (conf, 'rbd default format = 2')
conf_path = linux.write_to_temp_file(conf)
shell.call('qemu-img convert -f qcow2 -O rbd rbd:%s/%s rbd:%s/%s:conf=%s' % (pool, tmp_image_name, pool, image_name, conf_path))
shell.call('rbd rm %s/%s' % (pool, tmp_image_name))
finally:
if conf_path:
os.remove(conf_path)
else:
shell.call('rbd mv %s/%s %s/%s' % (pool, tmp_image_name, pool, image_name))
o = shell.call('rbd --format json info %s/%s' % (pool, image_name))
image_stats = jsonobject.loads(o)
rsp = DownloadRsp()
rsp.size = long(image_stats.size_)
rsp.actualSize = actual_size
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:wqlxx,项目名称:zstack-utility,代码行数:54,代码来源:cephagent.py
示例12: sync_call
def sync_call(self, apicmd, exception_on_error=True, fail_soon=False):
self._check_not_none_field(apicmd)
cmd = {apicmd.FULL_NAME: apicmd}
logger.debug("sync_call[url: %s, request: %s]" % (self.api_url, jsonobject.dumps(cmd)))
jstr = http.json_dump_post(self.api_url, cmd, fail_soon=fail_soon)
logger.debug("sync_call[url: %s, response: %s]" % (self.api_url, jstr))
rsp = jsonobject.loads(jstr)
reply = jsonobject.loads(rsp.result)
(name, r) = reply.__dict__.items()[0]
if exception_on_error:
if not r.success:
raise ApiError('API call[%s] failed because %s' % (name, self._error_code_to_string(r.error)))
return name, r
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:13,代码来源:api.py
示例13: delete
def delete(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
path = self._normalize_install_path(cmd.installPath)
o = shell.call('rbd snap ls --format json %s' % path)
o = jsonobject.loads(o)
if len(o) > 0:
raise Exception('unable to delete %s; the volume still has snapshots' % cmd.installPath)
shell.call('rbd rm %s' % path)
rsp = AgentResponse()
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:wqlxx,项目名称:zstack-utility,代码行数:14,代码来源:cephagent.py
示例14: remove_eip
def remove_eip(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
rsp = RemoveEipRsp()
self._remove_eip(cmd.eip)
return jsonobject.dumps(rsp)
开发者ID:QiRaining,项目名称:zstack-utility,代码行数:7,代码来源:eip.py
示例15: unprotect_snapshot
def unprotect_snapshot(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
spath = self._normalize_install_path(cmd.snapshotPath)
shell.call('rbd snap unprotect %s' % spath)
return jsonobject.dumps(AgentResponse())
开发者ID:wqlxx,项目名称:zstack-utility,代码行数:7,代码来源:cephagent.py
示例16: download
def download(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
pool, image_name = self._parse_install_path(cmd.installPath)
tmp_image_name = 'tmp-%s' % image_name
lichbd_file = os.path.join(pool, image_name)
tmp_lichbd_file = os.path.join(pool, tmp_image_name)
lichbd.lichbd_mkpool(os.path.dirname(lichbd_file))
shell.call('set -o pipefail; wget --no-check-certificate -q -O - %s | lichbd import - %s -p lichbd' % (cmd.url, tmp_lichbd_file))
@rollbackable
def _1():
if lichbd.lichbd_file_exist(tmp_lichbd_file):
lichbd.lichbd_rm(tmp_lichbd_file)
lichbd.lichbd_rm(lichbd_file)
_1()
qemu_img = lichbd.lichbd_get_qemu_img_path()
file_format = shell.call("set -o pipefail;%s info rbd:%s/%s 2>/dev/null | grep 'file format' | cut -d ':' -f 2" % (qemu_img, pool, tmp_image_name))
file_format = file_format.strip()
if file_format not in ['qcow2', 'raw']:
raise Exception('unknown image format: %s' % file_format)
lichbd.lichbd_mv(lichbd_file, tmp_lichbd_file)
size = lichbd.lichbd_file_size(lichbd_file)
rsp = DownloadRsp()
rsp.size = size
self._set_capacity_to_response(rsp)
return jsonobject.dumps(rsp)
开发者ID:rynetang,项目名称:zstack-utility,代码行数:30,代码来源:fusionstoragent.py
示例17: _set_capacity_to_response
def _set_capacity_to_response(self, rsp):
o = shell.call('ceph df -f json')
df = jsonobject.loads(o)
if df.stats.total_bytes__ is not None:
total = long(df.stats.total_bytes_)
elif df.stats.total_space__ is not None:
total = long(df.stats.total_space__) * 1024
else:
raise Exception('unknown ceph df output: %s' % o)
if df.stats.total_avail_bytes__ is not None:
avail = long(df.stats.total_avail_bytes_)
elif df.stats.total_avail__ is not None:
avail = long(df.stats.total_avail_) * 1024
else:
raise Exception('unknown ceph df output: %s' % o)
rsp.totalCapacity = total
rsp.availableCapacity = avail
rsp.xsky = isXsky()
if not df.pools:
return
pools = ceph.getCephPoolsCapacity()
if not pools:
return
rsp.poolCapacities = []
for pool in pools:
poolCapacity = CephPoolCapacity(pool.poolName, pool.availableCapacity, pool.replicatedSize, pool.usedCapacity, pool.poolTotalSize)
rsp.poolCapacities.append(poolCapacity)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:33,代码来源:cephagent.py
示例18: deletebits
def deletebits(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
rsp = AliyunNasResponse()
self.delNasBits(cmd.folder, cmd.path)
rsp.totalCapacity, rsp.availableCapacity = self._get_disk_capacity(cmd.uuid)
return jsonobject.dumps(rsp)
开发者ID:zstackorg,项目名称:zstack-utility,代码行数:7,代码来源:aliyun_nas_plugin.py
示例19: test_json
def test_json(self):
b = B()
jstr = jsonobject.dumps(b)
nb = jsonobject.loads(jstr)
self.assertEqual(2, len(nb.cs))
self.assertEqual(3, len(nb.lst))
self.assertEqual(1, nb.lst[0])
self.assertEqual('home', nb.address)
self.assertEqual(19, nb.c.age)
self.assertEqual('hello', nb.name)
self.assertFalse(nb.male)
jstr2 = jsonobject.dumps(nb)
self.assertEqual(jstr, jstr2)
jb = jsonobject.loads(jstr)
print jb.xxxxx
print jb.lst
开发者ID:QiRaining,项目名称:zstack-utility,代码行数:16,代码来源:test_json.py
示例20: remove_dhcp_entry
def remove_dhcp_entry(self, req):
cmd = jsonobject.loads(req[http.REQUEST_BODY])
rsp = RemoveDhcpEntryRsp()
try:
for e in cmd.dhcpEntries:
net_dev = shell.call("ifconfig|grep -i %s|awk '{print $1}'" % e.vrNicMac)
net_dev = net_dev.strip('\t\r\n ')
mac2 = e.mac.replace(':', '')
shell.call("sed -i '/%s/d' %s; \
sed -i '/^$/d' %s; \
sed -i '/%s/d' %s; \
sed -i '/^$/d' %s; \
sed -i '/%s/d' %s; \
sed -i '/^$/d' %s; \
dhcp_release %s %s %s"\
% (e.mac, self.HOST_DHCP_FILE, \
self.HOST_DHCP_FILE, \
mac2, self.HOST_OPTION_FILE, \
self.HOST_OPTION_FILE, \
e.ip, self.HOST_DNS_FILE, \
self.HOST_DNS_FILE, \
net_dev, e.ip, e.mac))
except virtualrouter.VirtualRouterError as e:
logger.warn(linux.get_exception_stacktrace())
rsp.error = str(e)
rsp.success = False
return jsonobject.dumps(rsp)
开发者ID:QiRaining,项目名称:zstack-utility,代码行数:29,代码来源:dnsmasq.py
注:本文中的zstacklib.utils.jsonobject.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论