本文整理汇总了Python中tests.utils.wait_task函数的典型用法代码示例。如果您正苦于以下问题:Python wait_task函数的具体用法?Python wait_task怎么用?Python wait_task使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait_task函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_tasks
def test_tasks(self):
id1 = AsyncTask('/plugins/ginger/tasks/1', self._async_op).id
id2 = AsyncTask('/plugins/ginger/tasks/2', self._except_op).id
id3 = AsyncTask('/plugins/ginger/tasks/3', self._intermid_op).id
target_uri = urllib2.quote('^/plugins/ginger/tasks/*', safe="")
filter_data = 'status=running&target_uri=%s' % target_uri
tasks = json.loads(
self.request('/plugins/ginger/tasks?%s' % filter_data).read()
)
self.assertEquals(3, len(tasks))
tasks = json.loads(self.request('/plugins/ginger/tasks').read())
tasks_ids = [t['id'] for t in tasks]
self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
wait_task(self._task_lookup, id2)
foo2 = json.loads(
self.request('/plugins/ginger/tasks/%s' % id2).read()
)
keys = ['id', 'status', 'message', 'target_uri']
self.assertEquals(sorted(keys), sorted(foo2.keys()))
self.assertEquals('failed', foo2['status'])
wait_task(self._task_lookup, id3)
foo3 = json.loads(
self.request('/plugins/ginger/tasks/%s' % id3).read()
)
self.assertEquals('in progress', foo3['message'])
self.assertEquals('running', foo3['status'])
开发者ID:kimchi-project,项目名称:ginger,代码行数:28,代码来源:test_tasks.py
示例2: add_vm
def add_vm(name):
# Create a VM
req = json.dumps({'name': name,
'template': '/plugins/kimchi/templates/test'})
task = json.loads(request(host, ssl_port, '/plugins/kimchi/vms',
req, 'POST').read())
wait_task(model.task_lookup, task['id'])
开发者ID:Finn10111,项目名称:kimchi,代码行数:7,代码来源:test_mockmodel.py
示例3: test_vm_info
def test_vm_info(self):
model.templates_create({'name': u'test',
'cdrom': fake_iso})
task = model.vms_create({'name': u'test-vm',
'template': '/plugins/kimchi/templates/test'})
wait_task(model.task_lookup, task['id'])
vms = model.vms_get_list()
self.assertEquals(2, len(vms))
self.assertIn(u'test-vm', vms)
keys = set(('name', 'state', 'stats', 'uuid', 'memory', 'cpu_info',
'screenshot', 'icon', 'graphics', 'users', 'groups',
'access', 'persistent'))
stats_keys = set(('cpu_utilization', 'mem_utilization',
'net_throughput', 'net_throughput_peak',
'io_throughput', 'io_throughput_peak'))
info = model.vm_lookup(u'test-vm')
self.assertEquals(keys, set(info.keys()))
self.assertEquals('shutoff', info['state'])
self.assertEquals('test-vm', info['name'])
self.assertEquals(get_template_default('old', 'memory'),
info['memory'])
self.assertEquals(1, info['cpu_info']['vcpus'])
self.assertEquals(1, info['cpu_info']['maxvcpus'])
self.assertEquals('plugins/kimchi/images/icon-vm.png', info['icon'])
self.assertEquals(stats_keys, set(info['stats'].keys()))
self.assertEquals('vnc', info['graphics']['type'])
self.assertEquals('127.0.0.1', info['graphics']['listen'])
开发者ID:Finn10111,项目名称:kimchi,代码行数:30,代码来源:test_mockmodel.py
示例4: test_screenshot_refresh
def test_screenshot_refresh(self):
# Create a VM
req = json.dumps({'name': 'test', 'cdrom': fake_iso})
request(host, ssl_port, '/plugins/kimchi/templates', req, 'POST')
req = json.dumps({'name': 'test-vm',
'template': '/plugins/kimchi/templates/test'})
resp = request(host, ssl_port, '/plugins/kimchi/vms', req, 'POST')
task = json.loads(resp.read())
wait_task(model.task_lookup, task['id'])
# Test screenshot refresh for running vm
request(host, ssl_port, '/plugins/kimchi/vms/test-vm/start', '{}',
'POST')
resp = request(host, ssl_port,
'/plugins/kimchi/vms/test-vm/screenshot')
self.assertEquals(200, resp.status)
self.assertEquals('image/png', resp.getheader('content-type'))
resp1 = request(host, ssl_port, '/plugins/kimchi/vms/test-vm')
rspBody = resp1.read()
testvm_Data = json.loads(rspBody)
screenshotURL = '/' + testvm_Data['screenshot']
time.sleep(5)
resp2 = request(host, ssl_port, screenshotURL)
self.assertEquals(200, resp2.status)
self.assertEquals(resp2.getheader('content-type'),
resp.getheader('content-type'))
self.assertEquals(resp2.getheader('content-length'),
resp.getheader('content-length'))
self.assertEquals(resp2.getheader('last-modified'),
resp.getheader('last-modified'))
开发者ID:Finn10111,项目名称:kimchi,代码行数:30,代码来源:test_mockmodel.py
示例5: test_debugreport_download
def test_debugreport_download(self):
req = json.dumps({'name': 'test_rest_report1'})
with RollbackContext() as rollback:
resp = request('/plugins/gingerbase/debugreports', req, 'POST')
self.assertEquals(202, resp.status)
task = json.loads(resp.read())
# make sure the debugreport doesn't exist until the
# the task is finished
wait_task(self._task_lookup, task['id'], 20)
rollback.prependDefer(self._report_delete, 'test_rest_report1')
resp = request(
'/plugins/gingerbase/debugreports/test_rest_report1'
)
debugreport = json.loads(resp.read())
self.assertEquals("test_rest_report1", debugreport['name'])
self.assertEquals(200, resp.status)
resp = request(
'/plugins/gingerbase/debugreports/test_rest_report1/content'
)
self.assertEquals(200, resp.status)
resp = request(
'/plugins/gingerbase/debugreports/test_rest_report1'
)
debugre = json.loads(resp.read())
resp = request('/' + debugre['uri'])
self.assertEquals(200, resp.status)
开发者ID:kimchi-project,项目名称:gingerbase,代码行数:26,代码来源:test_rest.py
示例6: test_vm_livemigrate_persistent_API
def test_vm_livemigrate_persistent_API(self):
patch_auth()
inst = model.Model(libvirt_uri='qemu:///system',
objstore_loc=self.tmp_store)
host = '127.0.0.1'
port = get_free_port('http')
ssl_port = get_free_port('https')
cherrypy_port = get_free_port('cherrypy_port')
with RollbackContext() as rollback:
test_server = run_server(host, port, ssl_port, test_mode=True,
cherrypy_port=cherrypy_port, model=inst)
rollback.prependDefer(test_server.stop)
self.request = partial(request, host, ssl_port)
self.create_vm_test()
rollback.prependDefer(rollback_wrapper, self.inst.vm_delete,
u'test_vm_migrate')
# removing cdrom because it is not shared storage and will make
# the migration fail
dev_list = self.inst.vmstorages_get_list('test_vm_migrate')
self.inst.vmstorage_delete('test_vm_migrate', dev_list[0])
try:
self.inst.vm_start('test_vm_migrate')
except Exception, e:
self.fail('Failed to start the vm, reason: %s' % e.message)
migrate_url = "/plugins/kimchi/vms/%s/migrate" % 'test_vm_migrate'
req = json.dumps({'remote_host': KIMCHI_LIVE_MIGRATION_TEST,
'user': 'root'})
resp = self.request(migrate_url, req, 'POST')
self.assertEquals(202, resp.status)
task = json.loads(resp.read())
wait_task(self._task_lookup, task['id'])
task = json.loads(
self.request(
'/plugins/kimchi/tasks/%s' % task['id'],
'{}'
).read()
)
self.assertEquals('finished', task['status'])
try:
remote_conn = self.get_remote_conn()
rollback.prependDefer(remote_conn.close)
remote_vm = remote_conn.lookupByName('test_vm_migrate')
self.assertTrue(remote_vm.isPersistent())
remote_vm.destroy()
remote_vm.undefine()
except Exception, e:
self.fail('Migration test failed: %s' % e.message)
开发者ID:Finn10111,项目名称:kimchi,代码行数:57,代码来源:test_livemigration.py
示例7: test_packages_update
def test_packages_update(self):
def _task_lookup(taskid):
return json.loads(self.request('/plugins/gingerbase/tasks/%s' %
taskid).read())
resp = self.request('/plugins/gingerbase/host/packagesupdate',
None, 'GET')
pkgs = json.loads(resp.read())
self.assertEquals(5, len(pkgs))
pkg_keys = ['package_name', 'repository', 'arch', 'version', 'depends']
for p in pkgs:
name = p['package_name']
resp = self.request('/plugins/gingerbase/host/packagesupdate/' +
name, None, 'GET')
info = json.loads(resp.read())
self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
# Test system update of specific package. Since package 'ginger' has
# 'wok' as dependency, both packages must be selected to be updated
# and, in the end of the process, we have only 3 packages to update.
uri = '/plugins/gingerbase/host/packagesupdate/ginger/upgrade'
resp = self.request(uri, '{}', 'POST')
task = json.loads(resp.read())
task_params = [u'id', u'message', u'status', u'target_uri']
self.assertEquals(sorted(task_params), sorted(task.keys()))
resp = self.request('/plugins/gingerbase/tasks/' + task[u'id'],
None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'running')
wait_task(_task_lookup, task_info['id'])
resp = self.request('/plugins/gingerbase/tasks/' + task[u'id'],
None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'finished')
self.assertIn(u'All packages updated', task_info['message'])
pkgs = model.packagesupdate_get_list()
self.assertEquals(3, len(pkgs))
# test system update of the rest of packages
resp = self.request('/plugins/gingerbase/host/swupdate', '{}', 'POST')
task = json.loads(resp.read())
task_params = [u'id', u'message', u'status', u'target_uri']
self.assertEquals(sorted(task_params), sorted(task.keys()))
resp = self.request('/tasks/' + task[u'id'], None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'running')
wait_task(_task_lookup, task_info['id'])
resp = self.request('/tasks/' + task[u'id'], None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'finished')
self.assertIn(u'All packages updated', task_info['message'])
pkgs = model.packagesupdate_get_list()
self.assertEquals(0, len(pkgs))
开发者ID:pvital,项目名称:gingerbase,代码行数:56,代码来源:test_host.py
示例8: test_memory_window_changes
def test_memory_window_changes(self):
model.templates_create({'name': u'test',
'source_media': fake_iso})
task = model.vms_create({'name': u'test-vm',
'template': '/plugins/kimchi/templates/test'})
wait_task(model.task_lookup, task['id'])
info = model.device_lookup('pci_0000_1a_00_0')
model.vmhostdevs_update_mmio_guest(u'test-vm', True)
model._attach_device(u'test-vm',
model._get_pci_device_xml(info, 0, False))
开发者ID:cclauss,项目名称:kimchi,代码行数:11,代码来源:test_mockmodel.py
示例9: test_vm_livemigrate_persistent_API
def test_vm_livemigrate_persistent_API(self):
patch_auth()
with RollbackContext() as rollback:
test_server = run_server()
rollback.prependDefer(test_server.stop)
self.request = partial(request)
self.create_vm_test()
rollback.prependDefer(
rollback_wrapper, self.inst.vm_delete, 'test_vm_migrate'
)
# removing cdrom because it is not shared storage and will make
# the migration fail
dev_list = self.inst.vmstorages_get_list('test_vm_migrate')
self.inst.vmstorage_delete('test_vm_migrate', dev_list[0])
try:
self.inst.vm_start('test_vm_migrate')
except Exception as e:
self.fail('Failed to start the vm, reason: %s' % e.message)
migrate_url = '/plugins/kimchi/vms/%s/migrate' % 'test_vm_migrate'
req = json.dumps(
{'remote_host': KIMCHI_LIVE_MIGRATION_TEST, 'user': 'root'}
)
resp = self.request(migrate_url, req, 'POST')
self.assertEqual(202, resp.status)
task = json.loads(resp.read())
wait_task(self._task_lookup, task['id'])
task = json.loads(
self.request('/plugins/kimchi/tasks/%s' %
task['id'], '{}').read()
)
self.assertEqual('finished', task['status'])
try:
remote_conn = self.get_remote_conn()
rollback.prependDefer(remote_conn.close)
remote_vm = remote_conn.lookupByName('test_vm_migrate')
self.assertTrue(remote_vm.isPersistent())
remote_vm.destroy()
remote_vm.undefine()
except Exception as e:
self.fail('Migration test failed: %s' % e.message)
开发者ID:alinefm,项目名称:kimchi,代码行数:47,代码来源:test_livemigration.py
示例10: test_hotplug_3D_card
def test_hotplug_3D_card(self):
model.templates_create({'name': u'test',
'source_media': fake_iso})
task = model.vms_create({'name': u'test-vm',
'template': '/plugins/kimchi/templates/test'})
wait_task(model.task_lookup, task['id'])
model.vm_start(u'test-vm')
# attach the 3D cards found to a running guest
all_devices = model.devices_get_list()
for device in all_devices:
device_info = model.device_lookup(device)
if model.device_is_device_3D_controller(device_info):
try:
model.vmhostdevs_create(u'test-vm', {'name': device})
# expect the error: KCHVMHDEV0006E
except InvalidOperation as e:
self.assertEqual(e.message[:14], u'KCHVMHDEV0006E')
开发者ID:cclauss,项目名称:kimchi,代码行数:19,代码来源:test_mockmodel.py
示例11: test_create_debugreport
def test_create_debugreport(self):
req = json.dumps({'name': 'report1'})
with RollbackContext() as rollback:
resp = request(host, ssl_port, '/plugins/gingerbase/debugreports',
req, 'POST')
self.assertEquals(202, resp.status)
task = json.loads(resp.read())
# make sure the debugreport doesn't exist until the
# the task is finished
wait_task(self._task_lookup, task['id'])
rollback.prependDefer(self._report_delete, 'report2')
resp = request(host, ssl_port,
'/plugins/gingerbase/debugreports/report1')
debugreport = json.loads(resp.read())
self.assertEquals("report1", debugreport['name'])
self.assertEquals(200, resp.status)
req = json.dumps({'name': 'report2'})
resp = request(host, ssl_port,
'/plugins/gingerbase/debugreports/report1',
req, 'PUT')
self.assertEquals(303, resp.status)
开发者ID:Clevero,项目名称:gingerbase,代码行数:21,代码来源:test_rest.py
示例12: test_host_actions
def test_host_actions(self):
def _task_lookup(taskid):
return json.loads(self.request('/plugins/gingerbase/tasks/%s' %
taskid).read())
resp = self.request('/plugins/gingerbase/host/shutdown', '{}', 'POST')
self.assertEquals(200, resp.status)
resp = self.request('/plugins/gingerbase/host/reboot', '{}', 'POST')
self.assertEquals(200, resp.status)
# Test system update
resp = self.request('/plugins/gingerbase/host/packagesupdate',
None, 'GET')
pkgs = json.loads(resp.read())
self.assertEquals(3, len(pkgs))
pkg_keys = ['package_name', 'repository', 'arch', 'version']
for p in pkgs:
name = p['package_name']
resp = self.request('/plugins/gingerbase/host/packagesupdate/' +
name, None, 'GET')
info = json.loads(resp.read())
self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
resp = self.request('/plugins/gingerbase/host/swupdate', '{}', 'POST')
task = json.loads(resp.read())
task_params = [u'id', u'message', u'status', u'target_uri']
self.assertEquals(sorted(task_params), sorted(task.keys()))
resp = self.request('/tasks/' + task[u'id'], None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'running')
wait_task(_task_lookup, task_info['id'])
resp = self.request('/tasks/' + task[u'id'], None, 'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'finished')
self.assertIn(u'All packages updated', task_info['message'])
pkgs = model.packagesupdate_get_list()
self.assertEquals(0, len(pkgs))
开发者ID:frediz,项目名称:gingerbase,代码行数:39,代码来源:test_host.py
示例13: _do_volume_test
def _do_volume_test(self, model, host, ssl_port, pool_name):
def _task_lookup(taskid):
return json.loads(
self.request('/plugins/kimchi/tasks/%s' % taskid).read()
)
uri = '/plugins/kimchi/storagepools/%s/storagevolumes' \
% pool_name.encode('utf-8')
resp = self.request(uri)
self.assertEquals(200, resp.status)
resp = self.request('/plugins/kimchi/storagepools/%s' %
pool_name.encode('utf-8'))
pool_info = json.loads(resp.read())
with RollbackContext() as rollback:
# Create storage volume with 'capacity'
vol = 'test-volume'
vol_uri = uri + '/' + vol
req = json.dumps({'name': vol, 'format': 'raw',
'capacity': 1073741824}) # 1 GiB
resp = self.request(uri, req, 'POST')
if pool_info['type'] in READONLY_POOL_TYPE:
self.assertEquals(400, resp.status)
else:
rollback.prependDefer(rollback_wrapper, model.storagevolume_delete,
pool_name, vol)
self.assertEquals(202, resp.status)
task_id = json.loads(resp.read())['id']
wait_task(_task_lookup, task_id)
status = json.loads(
self.request('/plugins/kimchi/tasks/%s' % task_id).read()
)
self.assertEquals('finished', status['status'])
vol_info = json.loads(self.request(vol_uri).read())
vol_info['name'] = vol
vol_info['format'] = 'raw'
vol_info['capacity'] = 1073741824
# Resize the storage volume: increase its capacity to 2 GiB
req = json.dumps({'size': 2147483648}) # 2 GiB
resp = self.request(vol_uri + '/resize', req, 'POST')
self.assertEquals(200, resp.status)
storagevolume = json.loads(self.request(vol_uri).read())
self.assertEquals(2147483648, storagevolume['capacity'])
# Resize the storage volume: decrease its capacity to 512 MiB
# This test case may fail if libvirt does not include the fix for
# https://bugzilla.redhat.com/show_bug.cgi?id=1021802
req = json.dumps({'size': 536870912}) # 512 MiB
resp = self.request(vol_uri + '/resize', req, 'POST')
self.assertEquals(200, resp.status)
storagevolume = json.loads(self.request(vol_uri).read())
self.assertEquals(536870912, storagevolume['capacity'])
# Wipe the storage volume
resp = self.request(vol_uri + '/wipe', '{}', 'POST')
self.assertEquals(200, resp.status)
storagevolume = json.loads(self.request(vol_uri).read())
self.assertEquals(0, storagevolume['allocation'])
# Clone the storage volume
vol_info = json.loads(self.request(vol_uri).read())
resp = self.request(vol_uri + '/clone', '{}', 'POST')
self.assertEquals(202, resp.status)
task = json.loads(resp.read())
cloned_vol_name = task['target_uri'].split('/')[-2]
rollback.prependDefer(model.storagevolume_delete, pool_name,
cloned_vol_name)
wait_task(_task_lookup, task['id'])
task = json.loads(
self.request('/plugins/kimchi/tasks/%s' % task['id']).read()
)
self.assertEquals('finished', task['status'])
resp = self.request(uri + '/' + cloned_vol_name.encode('utf-8'))
self.assertEquals(200, resp.status)
cloned_vol = json.loads(resp.read())
self.assertNotEquals(vol_info['name'], cloned_vol['name'])
self.assertNotEquals(vol_info['path'], cloned_vol['path'])
for key in ['name', 'path', 'allocation']:
del vol_info[key]
del cloned_vol[key]
self.assertEquals(vol_info, cloned_vol)
# Delete the storage volume
resp = self.request(vol_uri, '{}', 'DELETE')
self.assertEquals(204, resp.status)
resp = self.request(vol_uri)
self.assertEquals(404, resp.status)
# Storage volume upload
# It is done through a sequence of POST and several PUT requests
filename = 'COPYING.LGPL'
filepath = os.path.join(paths.get_prefix(), filename)
filesize = os.stat(filepath).st_size
# Create storage volume for upload
req = json.dumps({'name': filename, 'format': 'raw',
#.........这里部分代码省略.........
开发者ID:Pojen-Huang,项目名称:kimchi,代码行数:101,代码来源:test_model_storagevolume.py
示例14: test_nonroot_access
def test_nonroot_access(self, validate_users):
validate_users.return_value = True
# Non-root users can not create or delete network (only get)
resp = self.request('/plugins/kimchi/networks', '{}', 'GET')
self.assertEqual(200, resp.status)
resp = self.request('/plugins/kimchi/networks', '{}', 'POST')
self.assertEqual(403, resp.status)
resp = self.request(
'/plugins/kimchi/networks/default/activate', '{}', 'POST')
self.assertEqual(403, resp.status)
resp = self.request('/plugins/kimchi/networks/default', '{}', 'DELETE')
self.assertEqual(403, resp.status)
# Non-root users can not create or delete storage pool (only get)
resp = self.request('/plugins/kimchi/storagepools', '{}', 'GET')
self.assertEqual(200, resp.status)
resp = self.request('/plugins/kimchi/storagepools', '{}', 'POST')
self.assertEqual(403, resp.status)
resp = self.request(
'/plugins/kimchi/storagepools/default/activate', '{}', 'POST'
)
self.assertEqual(403, resp.status)
resp = self.request(
'/plugins/kimchi/storagepools/default', '{}', 'DELETE')
self.assertEqual(403, resp.status)
# Non-root users can not update or delete a template
# but he can get and create a new one
resp = self.request('/plugins/kimchi/templates', '{}', 'GET')
self.assertEqual(403, resp.status)
req = json.dumps({'name': 'test', 'source_media': fake_iso})
resp = self.request('/plugins/kimchi/templates', req, 'POST')
self.assertEqual(403, resp.status)
resp = self.request('/plugins/kimchi/templates/test', '{}', 'PUT')
self.assertEqual(403, resp.status)
resp = self.request('/plugins/kimchi/templates/test', '{}', 'DELETE')
self.assertEqual(403, resp.status)
# Non-root users can only get vms authorized to them
model.templates_create(
{'name': u'test', 'source_media': {'type': 'disk', 'path': fake_iso}}
)
task_info = model.vms_create(
{'name': u'test-me', 'template': '/plugins/kimchi/templates/test'}
)
wait_task(model.task_lookup, task_info['id'])
model.vm_update(u'test-me', {'users': ['user'], 'groups': []})
task_info = model.vms_create(
{'name': u'test-usera', 'template': '/plugins/kimchi/templates/test'}
)
wait_task(model.task_lookup, task_info['id'])
non_root = list(set(model.users_get_list()) - set(['admin']))[0]
model.vm_update(u'test-usera', {'users': [non_root], 'groups': []})
task_info = model.vms_create(
{'name': u'test-groupa', 'template': '/plugins/kimchi/templates/test'}
)
wait_task(model.task_lookup, task_info['id'])
a_group = model.groups_get_list()[0]
model.vm_update(u'test-groupa', {'groups': [a_group]})
resp = self.request('/plugins/kimchi/vms', '{}', 'GET')
self.assertEqual(200, resp.status)
vms_data = json.loads(resp.read())
self.assertEqual(
[u'test-groupa', u'test-me'], sorted([v['name'] for v in vms_data])
)
resp = self.request('/plugins/kimchi/vms', req, 'POST')
self.assertEqual(403, resp.status)
# Create a vm using mockmodel directly to test Resource access
task_info = model.vms_create(
{'name': 'kimchi-test', 'template': '/plugins/kimchi/templates/test'}
)
wait_task(model.task_lookup, task_info['id'])
resp = self.request('/plugins/kimchi/vms/kimchi-test', '{}', 'PUT')
self.assertEqual(403, resp.status)
resp = self.request('/plugins/kimchi/vms/kimchi-test', '{}', 'DELETE')
self.assertEqual(403, resp.status)
# Non-root users can only update VMs authorized by them
resp = self.request('/plugins/kimchi/vms/test-me/start', '{}', 'POST')
self.assertEqual(200, resp.status)
resp = self.request(
'/plugins/kimchi/vms/test-usera/start', '{}', 'POST')
self.assertEqual(403, resp.status)
model.template_delete('test')
model.vm_delete('test-me')
开发者ID:alinefm,项目名称:kimchi,代码行数:94,代码来源:test_authorization.py
示例15: test_nonroot_access
def test_nonroot_access(self):
# Non-root users can not create or delete network (only get)
resp = self.request("/plugins/kimchi/networks", "{}", "GET")
self.assertEquals(200, resp.status)
resp = self.request("/plugins/kimchi/networks", "{}", "POST")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/networks/default/activate", "{}", "POST")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/networks/default", "{}", "DELETE")
self.assertEquals(403, resp.status)
# Non-root users can not create or delete storage pool (only get)
resp = self.request("/plugins/kimchi/storagepools", "{}", "GET")
self.assertEquals(200, resp.status)
resp = self.request("/plugins/kimchi/storagepools", "{}", "POST")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/storagepools/default/activate", "{}", "POST")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/storagepools/default", "{}", "DELETE")
self.assertEquals(403, resp.status)
# Non-root users can not update or delete a template
# but he can get and create a new one
resp = self.request("/plugins/kimchi/templates", "{}", "GET")
self.assertEquals(403, resp.status)
req = json.dumps({"name": "test", "source_media": fake_iso})
resp = self.request("/plugins/kimchi/templates", req, "POST")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/templates/test", "{}", "PUT")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/templates/test", "{}", "DELETE")
self.assertEquals(403, resp.status)
# Non-root users can only get vms authorized to them
model.templates_create({"name": u"test", "source_media": {"type": "disk", "path": fake_iso}})
task_info = model.vms_create({"name": u"test-me", "template": "/plugins/kimchi/templates/test"})
wait_task(model.task_lookup, task_info["id"])
fake_user = get_fake_user()
model.vm_update(u"test-me", {"users": [fake_user.keys()[0]], "groups": []})
task_info = model.vms_create({"name": u"test-usera", "template": "/plugins/kimchi/templates/test"})
wait_task(model.task_lookup, task_info["id"])
non_root = list(set(model.users_get_list()) - set(["root"]))[0]
model.vm_update(u"test-usera", {"users": [non_root], "groups": []})
task_info = model.vms_create({"name": u"test-groupa", "template": "/plugins/kimchi/templates/test"})
wait_task(model.task_lookup, task_info["id"])
a_group = model.groups_get_list()[0]
model.vm_update(u"test-groupa", {"groups": [a_group]})
resp = self.request("/plugins/kimchi/vms", "{}", "GET")
self.assertEquals(200, resp.status)
vms_data = json.loads(resp.read())
self.assertEquals([u"test-groupa", u"test-me"], sorted([v["name"] for v in vms_data]))
resp = self.request("/plugins/kimchi/vms", req, "POST")
self.assertEquals(403, resp.status)
# Create a vm using mockmodel directly to test Resource access
task_info = model.vms_create({"name": "kimchi-test", "template": "/plugins/kimchi/templates/test"})
wait_task(model.task_lookup, task_info["id"])
resp = self.request("/plugins/kimchi/vms/kimchi-test", "{}", "PUT")
self.assertEquals(403, resp.status)
resp = self.request("/plugins/kimchi/vms/kimchi-test", "{}", "DELETE")
self.assertEquals(403, resp.status)
# Non-root users can only update VMs authorized by them
resp = self.request("/plugins/kimchi/vms/test-me/start", "{}", "POST")
self.assertEquals(200, resp.status)
resp = self.request("/plugins/kimchi/vms/test-usera/start", "{}", "POST")
self.assertEquals(403, resp.status)
model.template_delete("test")
model.vm_delete("test-me")
开发者ID:kimchi-project,项目名称:kimchi,代码行数:77,代码来源:test_authorization.py
注:本文中的tests.utils.wait_task函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论