本文整理汇总了Python中wok.model.tasks.TaskModel类的典型用法代码示例。如果您正苦于以下问题:Python TaskModel类的具体用法?Python TaskModel怎么用?Python TaskModel使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TaskModel类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: VolumeGroupsTests
class VolumeGroupsTests(unittest.TestCase):
def setUp(self):
objstore_loc = config.get_object_store() + "_ginger"
self._objstore = ObjectStore(objstore_loc)
self.task_model = TaskModel(objstore=self._objstore)
def test_get_vg_list(self):
vgs = vol_group.VolumeGroupsModel(objstore=self._objstore)
vgs_list = vgs.get_list()
self.assertGreaterEqual(len(vgs_list), 0)
def test_create_vg_missing_name(self):
vgs = vol_group.VolumeGroupsModel(objstore=self._objstore)
pvpaths = ["/dev/sdb1"]
params = {"pv_paths": pvpaths}
self.assertRaises(MissingParameter, vgs.create, params)
def test_create_vg_missing_pvpaths(self):
vgs = vol_group.VolumeGroupsModel(objstore=self._objstore)
vgname = "testvg"
params = {"vg_name": vgname}
self.assertRaises(MissingParameter, vgs.create, params)
@mock.patch("wok.plugins.ginger.models.utils._create_vg", autospec=True)
def test_create_vg(self, mock_create_vg):
vgs = vol_group.VolumeGroupsModel(objstore=self._objstore)
vgname = "testvg"
pvpaths = ["/dev/sdb1"]
params = {"vg_name": vgname, "pv_paths": pvpaths}
task_obj = vgs.create(params)
self.task_model.wait(task_obj.get("id"))
mock_create_vg.assert_called_with(vgname, pvpaths)
@mock.patch("wok.plugins.ginger.models.utils._extend_vg", autospec=True)
def test_extend_vg(self, mock_extend_vg):
vg = vol_group.VolumeGroupModel(objstore=self._objstore)
vgname = "testvg"
pvpaths = ["/dev/sdb2"]
vg.extend(vgname, pvpaths)
mock_extend_vg.assert_called_with(vgname, pvpaths)
@mock.patch("wok.plugins.ginger.models.utils._reduce_vg", autospec=True)
def test_reduce_vg(self, mock_reduce_vg):
vg = vol_group.VolumeGroupModel(objstore=self._objstore)
vgname = "testvg"
pvpaths = ["/dev/sdb2"]
vg.reduce(vgname, pvpaths)
mock_reduce_vg.assert_called_with(vgname, pvpaths)
@mock.patch("wok.plugins.ginger.models.utils._remove_vg", autospec=True)
def test_delete_vg(self, mock_delete_vg):
vg = vol_group.VolumeGroupModel(objstore=self._objstore)
vgname = "testvg"
vg.delete(vgname)
mock_delete_vg.assert_called_with(vgname)
开发者ID:atreyeemukhopadhyay,项目名称:ginger,代码行数:55,代码来源:test_vol_group.py
示例2: FirmwareProgressTests
class FirmwareProgressTests(unittest.TestCase):
def setUp(self):
objstore_loc = config.get_object_store() + '_ginger'
self._objstore = ObjectStore(objstore_loc)
self.task = TaskModel(objstore=self._objstore)
def test_fwprogress_without_update_flash(self):
fwprogress = FirmwareProgressModel(objstore=self._objstore)
task_info = fwprogress.lookup()
self.task.wait(task_info['id'])
task_info = self.task.lookup(task_info['id'])
self.assertEquals('finished', task_info['status'])
self.assertIn('Error', task_info['message'])
self.assertEquals('/plugins/ginger/fwprogress',
task_info['target_uri'])
开发者ID:MalleshKoti,项目名称:ginger,代码行数:16,代码来源:test_firmwareprogress.py
示例3: __init__
def __init__(self, **kargs):
self.task = TaskModel(**kargs)
self.objstore = kargs['objstore']
self.pkgs2update = []
try:
self.host_swupdate = SoftwareUpdate()
except:
self.host_swupdate = None
开发者ID:kimchi-project,项目名称:gingerbase,代码行数:8,代码来源:packagesupdate.py
示例4: PartitionModel
class PartitionModel(object):
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def lookup(self, name, dev=None):
try:
part_details = get_partition_details(name)
part_path = part_details['path']
vg_name = _get_vgname(part_path)
if vg_name:
part_details['vgname'] = vg_name
else:
part_details['vgname'] = "N/A"
return part_details
except NotFoundError:
raise NotFoundError("GINPART00014E", {'name': name})
except OperationFailed as e:
raise OperationFailed("GINPART00003E",
{'name': name, 'err': e.message})
def format(self, name, fstype):
if utils._is_mntd(name):
raise OperationFailed('GINPART00004E')
task_params = {'name': name, 'fstype': fstype}
taskid = AsyncTask(u'/partitions/%s/fstype%s' % (name, fstype),
self._format_task, task_params).id
return self.task.lookup(taskid)
def _format_task(self, cb, params):
name = params['name']
fstype = params['fstype']
try:
utils._makefs(fstype, name)
except (OperationFailed):
raise OperationFailed('GINPART00005E')
cb('OK', True)
def change_type(self, name, type):
try:
utils.change_part_type(name, type)
except OperationFailed as e:
raise OperationFailed("GINPART00006E",
{'err': e.message})
return name
def delete(self, name):
try:
utils.delete_part(name)
except OperationFailed as e:
raise OperationFailed("GINPART00007E",
{'err': e.message})
开发者ID:lcorreia,项目名称:ginger,代码行数:58,代码来源:diskparts.py
示例5: DASDdevModel
class DASDdevModel(object):
"""
Model for viewing and formatting a DASD device
"""
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
self.dev_details = {}
def lookup(self, bus_id):
dasd_utils.validate_bus_id(bus_id)
try:
dasddevices = dasd_utils._get_dasd_dev_details(bus_id)
self.dev_details = dasddevices[0]
except IndexError as e:
wok_log.error("DASD device %s not found." % bus_id)
raise NotFoundError("GINDASD0006E", {'err': e})
return self.dev_details
def format(self, bus_id, blk_size):
dasd_utils.validate_bus_id(bus_id)
woklock = threading.Lock()
name = self.dev_details['name']
dasd_name_list = dasd_utils._get_dasd_names()
if name not in dasd_name_list:
raise NotFoundError('GINDASD0007E')
task_params = {'blk_size': blk_size, 'name': name}
try:
woklock.acquire()
taskid = add_task(u'/dasddevs/%s/blksize/%s' % (name, blk_size),
self._format_task, self.objstore, task_params)
except OperationFailed:
woklock.release()
wok_log.error("Formatting of DASD device %s failed" % bus_id)
raise OperationFailed("GINDASD0008E", {'name': name})
finally:
woklock.release()
return self.task.lookup(taskid)
def _format_task(self, cb, params):
if 'name' not in params:
raise MissingParameter("GINDASD0009E")
name = params['name']
if 'blk_size' not in params:
raise MissingParameter("GINDASD0010E")
blk_size = params['blk_size']
try:
dasd_utils._format_dasd(blk_size, name)
except OperationFailed:
wok_log.error("Formatting of DASD device %s failed" % name)
raise OperationFailed('GINDASD0008E', {'name': name})
cb('OK', True)
开发者ID:MalleshKoti,项目名称:ginger,代码行数:58,代码来源:dasddevs.py
示例6: ArchiveModel
class ArchiveModel(object):
def __init__(self, **kargs):
self._objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def lookup(self, archive_id):
with self._objstore as session:
info = session.get(ArchivesModel._objstore_type, archive_id)
return info
def _session_delete_archive(self, session, archive_id):
# Assume session is already locked.
try:
ar_params = session.get(ArchivesModel._objstore_type, archive_id)
except NotFoundError:
return
if ar_params['file'] != '':
try:
os.unlink(ar_params['file'])
except OSError as e:
# It's OK if the user already removed the file manually
if e.errno not in (errno.EACCES, errno.ENOENT):
raise OperationFailed(
'GINHBK0002E', {'name': ar_params['file']})
session.delete(ArchivesModel._objstore_type, archive_id)
def delete(self, archive_id):
with self._objstore as session:
self._session_delete_archive(session, archive_id)
def _restore_tar(self, archive_id):
backup_dir = os.path.join(PluginPaths('ginger').state_dir,
'ginger_backups')
backup_file = os.path.join(backup_dir, archive_id + '.tar.gz')
cmd = ['tar', '-xzf', backup_file, '-C', '/']
out, err, rc = run_command(cmd)
if rc != 0:
raise OperationFailed('GINHBK0001E', {'name': backup_file,
'cmd': ' '.join(cmd)})
def _restore_task(self, rb, backup_id):
rb('entering task to restore config backup')
try:
self._restore_tar(backup_id)
rb('OK', True)
except (InvalidOperation) as e:
rb(e.message, False)
except (OperationFailed) as e:
rb(e.message, False)
raise OperationFailed('GINHBK0013E', {'err': e.message})
def restore(self, archive_id):
taskid = AsyncTask(u'/backup/restore/%s' % (archive_id),
self._restore_task, archive_id).id
return self.task.lookup(taskid)
开发者ID:kimchi-project,项目名称:ginger,代码行数:58,代码来源:backup.py
示例7: __init__
def __init__(self, **kargs):
self.conn = kargs['conn']
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
self.storagevolumes = StorageVolumesModel(**kargs)
self.storagepool = StoragePoolModel(**kargs)
if self.conn.get() is not None:
self.libvirt_user = UserTests().probe_user()
else:
self.libvirt_user = None
开发者ID:alinefm,项目名称:kimchi,代码行数:10,代码来源:storagevolumes.py
示例8: setUp
def setUp(self):
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
objstore_loc = self.temp_file.name
self._objstore = ObjectStore(objstore_loc)
self.task = TaskModel(objstore=self._objstore)
ArchivesModel._archive_dir = '/tmp'
ArchivesModel._default_include = []
ArchivesModel._default_exclude = []
开发者ID:LiftedKilt,项目名称:ginger,代码行数:10,代码来源:test_backup.py
示例9: __init__
def __init__(self, **kargs):
self.conn = kargs['conn']
self.objstore = kargs['objstore']
self.events = kargs['eventsloop']
self.task = TaskModel(**kargs)
self.devs_model = DevicesModel(**kargs)
self.dev_model = DeviceModel(**kargs)
self._cb = None
self.events.registerDetachDevicesEvent(
self.conn, self._event_devices, self)
开发者ID:alinefm,项目名称:kimchi,代码行数:10,代码来源:vmhostdevs.py
示例10: PhysicalVolumesModel
class PhysicalVolumesModel(object):
"""
Model class for listing and creating a PV
"""
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def create(self, params):
if 'pv_name' not in params:
raise MissingParameter("GINPV00001E")
pvname = params['pv_name']
taskid = AsyncTask(u'/pvs/pv_name/%s' % (pvname),
self._create_task, params).id
return self.task.lookup(taskid)
def _create_task(self, cb, params):
pvname = params['pv_name']
cb('entering task to create pv')
try:
cb('create pv')
part = PartitionModel(objstore=self.objstore)
part_name = pvname.split('/')[-1]
dev_type = part.lookup(part_name)
if dev_type['type'] == 'part':
if 'dasd' in dev_type['name']:
type = '4'
change_dasdpart_type(part_name, type)
else:
type = '8e' # hex value for type Linux LVM
part.change_type(part_name, type)
utils._create_pv(pvname)
except OperationFailed:
raise OperationFailed("GINPV00002E",
{'name': pvname})
cb('OK', True)
def get_list(self):
try:
pv_names = utils._get_pv_devices()
except OperationFailed as e:
raise NotFoundError("GINPV00003E",
{'err': e.message})
return pv_names
开发者ID:kimchi-project,项目名称:ginger,代码行数:55,代码来源:physical_vol.py
示例11: LogicalVolumesTests
class LogicalVolumesTests(unittest.TestCase):
def setUp(self):
objstore_loc = config.get_object_store() + '_ginger'
self._objstore = ObjectStore(objstore_loc)
self.task_model = TaskModel(objstore=self._objstore)
def test_get_lv_list(self):
lvs = log_volume.LogicalVolumesModel(objstore=self._objstore)
lvs_list = lvs.get_list()
self.assertGreaterEqual(len(lvs_list), 0)
def test_create_lv_missing_vgname(self):
lvs = log_volume.LogicalVolumesModel(objstore=self._objstore)
size = ['10M']
params = {'size': size}
self.assertRaises(MissingParameter, lvs.create, params)
def test_create_lv_missing_size(self):
lvs = log_volume.LogicalVolumesModel(objstore=self._objstore)
vgname = 'testvg'
params = {'vg_name': vgname}
self.assertRaises(MissingParameter, lvs.create, params)
@mock.patch('wok.plugins.ginger.model.utils._create_lv', autospec=True)
def test_create_lv(self, mock_create_lv):
lvs = log_volume.LogicalVolumesModel(objstore=self._objstore)
vgname = 'testvg'
size = '10M'
params = {'vg_name': vgname, 'size': size}
task_obj = lvs.create(params)
self.task_model.wait(task_obj.get('id'))
mock_create_lv.assert_called_with(vgname, size)
@mock.patch('wok.plugins.ginger.model.utils._remove_lv',
autospec=True)
def test_delete_lv(self, mock_delete_lv):
lv = log_volume.LogicalVolumeModel(objstore=self._objstore)
lvname = '/dev/testvg/lvol0'
lv.delete(lvname)
mock_delete_lv.assert_called_with(lvname)
开发者ID:LiftedKilt,项目名称:ginger,代码行数:40,代码来源:test_log_volume.py
示例12: PackageUpdateModel
class PackageUpdateModel(object):
def __init__(self, **kargs):
self.task = TaskModel(**kargs)
self.objstore = kargs['objstore']
self.pkgs2update = []
try:
self.host_swupdate = SoftwareUpdate()
except:
self.host_swupdate = None
def lookup(self, name):
if self.host_swupdate is None:
raise OperationFailed('GGBPKGUPD0004E')
return self.host_swupdate.getUpdate(name)
def _resolve_dependencies(self, package=None, dep_list=None):
"""
Resolve the dependencies for a given package from the dictionary of
eligible packages to be upgraded.
"""
if dep_list is None:
dep_list = []
if package is None:
return []
dep_list.append(package)
deps = self.host_swupdate.getUpdate(package)['depends']
for pkg in set(deps).intersection(self.pkgs2update):
if pkg in dep_list:
break
self._resolve_dependencies(pkg, dep_list)
return dep_list
def upgrade(self, name):
"""
Execute the update of a specific package (and its dependencies, if
necessary) in the system.
@param: Name
@return: task
"""
if self.host_swupdate is None:
raise OperationFailed('GGBPKGUPD0004E')
self.pkgs2update = self.host_swupdate.getUpdates()
pkgs_list = self._resolve_dependencies(name)
msg = 'The following packages will be updated: ' + ', '.join(pkgs_list)
wok_log.debug(msg)
taskid = add_task('/plugins/gingerbase/host/packagesupdate/%s/upgrade'
% name, self.host_swupdate.doUpdate,
self.objstore, pkgs_list)
return self.task.lookup(taskid)
开发者ID:yashodhank,项目名称:gingerbase,代码行数:52,代码来源:packagesupdate.py
示例13: PhysicalVolumesModel
class PhysicalVolumesModel(object):
"""
Model class for listing and creating a PV
"""
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def create(self, params):
if 'pv_name' not in params:
raise MissingParameter("GINPV00001E")
pvname = params['pv_name']
taskid = add_task(u'/pvs/pv_name/%s' % (pvname),
self._create_task, self.objstore, params)
return self.task.lookup(taskid)
def _create_task(self, cb, params):
pvname = params['pv_name']
cb('entering task to create pv')
try:
cb('create pv')
part = PartitionModel(objstore=self.objstore)
part_name = pvname.split('/')[2]
type = '8e' # hex value for type Linux LVM
part.change_type(part_name, type)
utils._create_pv(pvname)
except OperationFailed:
wok_log.error("PV create failed")
raise OperationFailed("GINPV00002E",
{'pvname': pvname})
cb('OK', True)
def get_list(self):
try:
pv_names = utils._get_pv_devices()
except OperationFailed as e:
wok_log.error("Unable to fetch list of PVs")
raise NotFoundError("GINPV00003E",
{'err': e.message})
return pv_names
开发者ID:atreyeemukhopadhyay,项目名称:ginger,代码行数:51,代码来源:physical_vol.py
示例14: SoftwareUpdateProgressModel
class SoftwareUpdateProgressModel(object):
def __init__(self, **kargs):
self.task = TaskModel(**kargs)
self.objstore = kargs['objstore']
def lookup(self, *name):
try:
swupdate = SoftwareUpdate()
except:
raise OperationFailed('GGBPKGUPD0004E')
taskid = add_task('/plugins/gingerbase/host/swupdateprogress',
swupdate.tailUpdateLogs, self.objstore, None)
return self.task.lookup(taskid)
开发者ID:popbjc,项目名称:kimchi,代码行数:14,代码来源:host.py
示例15: PartitionModel
class PartitionModel(object):
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def lookup(self, name, dev=None):
try:
return get_partition_details(name)
except OperationFailed as e:
wok_log.error("lookup method of partition failed")
raise OperationFailed("GINPART00003E",
{'err': e})
def format(self, name, fstype):
if utils._is_mntd(name):
raise OperationFailed('GINPART00004E')
task_params = {'name': name, 'fstype': fstype}
taskid = add_task(u'/partitions/%s/fstype%s' % (name, fstype),
self._format_task, self.objstore, task_params)
return self.task.lookup(taskid)
def _format_task(self, cb, params):
name = '/dev/' + params['name']
fstype = params['fstype']
try:
utils._makefs(fstype, name)
except (OperationFailed):
raise OperationFailed('GINPART00005E')
cb('OK', True)
def change_type(self, name, type):
try:
utils.change_part_type(name, type)
except OperationFailed as e:
wok_log.error("change type for partition failed")
raise OperationFailed("GINPART00006E",
{'err': e})
return name
def delete(self, name):
try:
utils.delete_part(name)
except OperationFailed as e:
wok_log.error("delete partition failed")
raise OperationFailed("GINPART00007E",
{'err': e})
开发者ID:atreyeemukhopadhyay,项目名称:ginger,代码行数:50,代码来源:diskparts.py
示例16: CIOIgnoreModel
class CIOIgnoreModel(object):
"""
model class for ignore list
"""
def __init__(self, **kargs):
self.objstore = kargs.get('objstore')
self.task = TaskModel(**kargs)
def lookup(self, name):
"""
method to retrieve device IDs in ignore list
:return: returns dictionary with key as 'ignored_devices
and value as list of device ids(single device id
or range of device ids)
"""
devices = {}
command = [CIO_IGNORE, '-l']
out, err, rc = run_command(command)
if rc:
wok_log.error('failed to retrieve ignore list '
'using \'cio_ignore -l\'. Error: %s' % err.strip())
raise OperationFailed('GS390XIOIG001E', {'error': err.strip()})
devices[IGNORED_DEVICES] = _parse_ignore_output(out)
wok_log.info('Successfully retrieved devices from ignore list')
return devices
def remove(self, name, devices):
"""
Remove one or more device IDs from blacklist.
:param devices: List of devices
:return: task json
"""
# Check the type of devices.
if not (isinstance(devices, list)):
wok_log.error('Input is not of type list. Input: %s' % devices)
raise InvalidParameter('GS390XINVINPUT', {'reason': 'input must '
'be of type'
' list'})
wok_log.info('Create task for removing devices \"% s\" from ignore'
'list' % devices)
taskid = AsyncTask('/plugins/gingers390x/cioignore/remove',
_remove_devices, devices).id
return self.task.lookup(taskid)
开发者ID:kimchi-project,项目名称:gingers390x,代码行数:44,代码来源:cioignore.py
示例17: VolumeGroupsModel
class VolumeGroupsModel(object):
"""
Model class for listing and creating a VG
"""
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def create(self, params):
if 'vg_name' not in params:
raise MissingParameter("GINVG00013E")
vgname = params['vg_name']
if "pv_paths" not in params:
raise MissingParameter("GINVG00014E")
taskid = add_task(u'/vgs/vg_name/%s' % (vgname),
self._create_task, self.objstore, params)
return self.task.lookup(taskid)
def _create_task(self, cb, params):
vgname = params['vg_name']
pv_paths = params['pv_paths']
cb('entering task to create vg')
try:
cb('create vg')
utils._create_vg(vgname, pv_paths)
except (OperationFailed), e:
wok_log.error('failed to create vg')
raise OperationFailed('GINVG00001E',
{'vgname': vgname,
'err': e.message})
cb('OK', True)
开发者ID:atreyeemukhopadhyay,项目名称:ginger,代码行数:42,代码来源:vol_group.py
示例18: LUNScanModel
class LUNScanModel(object):
"""
model class for ignore list
"""
def __init__(self, **kargs):
self.objstore = kargs.get('objstore')
self.task = TaskModel(**kargs)
def lookup(self, name):
"""
Get the status of LUN scanning
:return: returns dictionary with key as 'lunscan'
and value as boolean
"""
return utils.is_lun_scan_enabled()
def enable(self, name):
"""
Enable LUN scanning
"""
utils.enable_lun_scan("1")
return utils.is_lun_scan_enabled()
def disable(self, name):
"""
Disable LUN scanning
"""
utils.enable_lun_scan("0")
return utils.is_lun_scan_enabled()
def trigger(self, name):
"""
Trigger LUN scanning
"""
taskid = add_task('/plugins/gingers390/lunscan/trigger',
utils.trigger_lun_scan, self.objstore, {})
return self.task.lookup(taskid)
开发者ID:pawankg,项目名称:gingers390x,代码行数:39,代码来源:fc_luns.py
示例19: LogicalVolumesModel
class LogicalVolumesModel(object):
"""
Model class for listing and creating a LV
"""
def __init__(self, **kargs):
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def create(self, params):
if 'vg_name' not in params:
raise MissingParameter('GINLV00001E')
vgname = params['vg_name']
if 'size' not in params:
raise MissingParameter('GINLV00002E')
taskid = AsyncTask(u'/lvs/vg_name/%s' % (vgname),
self._create_linear_task, params).id
return self.task.lookup(taskid)
def _create_linear_task(self, cb, params):
vgname = params['vg_name']
size = params['size']
cb('entering task to create lv')
try:
cb('create lv')
utils._create_lv(vgname, size)
except (OperationFailed), e:
raise OperationFailed('GINLV00003E',
{'err': e.message})
cb('OK', True)
开发者ID:kimchi-project,项目名称:ginger,代码行数:38,代码来源:log_volume.py
示例20: StorageVolumesModel
class StorageVolumesModel(object):
def __init__(self, **kargs):
self.conn = kargs['conn']
self.objstore = kargs['objstore']
self.task = TaskModel(**kargs)
def create(self, pool_name, params):
vol_source = ['url', 'capacity']
name = params.get('name')
index_list = list(i for i in range(len(vol_source))
if vol_source[i] in params)
if len(index_list) != 1:
raise InvalidParameter("KCHVOL0018E",
{'param': ",".join(vol_source)})
create_param = vol_source[index_list[0]]
# Verify if the URL is valid
if create_param == 'url':
url = params['url']
try:
urllib2.urlopen(url).close()
except:
raise InvalidParameter('KCHVOL0022E', {'url': url})
all_vol_names = self.get_list(pool_name)
if name is None:
# the methods listed in 'REQUIRE_NAME_PARAMS' cannot have
# 'name' == None
if create_param in REQUIRE_NAME_PARAMS:
raise InvalidParameter('KCHVOL0016E')
# if 'name' is omitted - except for the methods listed in
# 'REQUIRE_NAME_PARAMS' - the default volume name will be the
# file/URL basename.
if create_param == 'url':
name = os.path.basename(params['url'])
else:
name = 'upload-%s' % int(time.time())
name = get_unique_file_name(all_vol_names, name)
params['name'] = name
try:
create_func = getattr(self, '_create_volume_with_%s' %
create_param)
except AttributeError:
raise InvalidParameter("KCHVOL0019E", {'param': create_param})
pool_info = StoragePoolModel(conn=self.conn,
objstore=self.objstore).lookup(pool_name)
if pool_info['type'] in READONLY_POOL_TYPE:
raise InvalidParameter("KCHVOL0012E", {'type': pool_info['type']})
if pool_info['state'] == 'inactive':
raise InvalidParameter('KCHVOL0003E', {'pool': pool_name,
'volume': name})
if name in all_vol_names:
raise InvalidParameter('KCHVOL0001E', {'name': name})
params['pool'] = pool_name
targeturi = '/plugins/kimchi/storagepools/%s/storagevolumes/%s' \
% (pool_name, name)
taskid = add_task(targeturi, create_func, self.objstore, params)
return self.task.lookup(taskid)
def _create_volume_with_capacity(self, cb, params):
pool_name = params.pop('pool')
vol_xml = """
<volume>
<name>%(name)s</name>
<allocation unit='bytes'>%(allocation)s</allocation>
<capacity unit='bytes'>%(capacity)s</capacity>
<source>
</source>
<target>
<format type='%(format)s'/>
</target>
</volume>
"""
params.setdefault('allocation', 0)
params.setdefault('format', 'qcow2')
name = params['name']
try:
pool = StoragePoolModel.get_storagepool(pool_name, self.conn)
xml = vol_xml % params
except KeyError, item:
raise MissingParameter("KCHVOL0004E", {'item': str(item),
'volume': name})
try:
pool.createXML(xml, 0)
except libvirt.libvirtError as e:
raise OperationFailed("KCHVOL0007E",
{'name': name, 'pool': pool,
'err': e.get_error_message()})
#.........这里部分代码省略.........
开发者ID:andreteodoro,项目名称:kimchi,代码行数:101,代码来源:storagevolumes.py
注:本文中的wok.model.tasks.TaskModel类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论