• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.wait_task函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.utils.wait_task函数的典型用法代码示例。如果您正苦于以下问题:Python wait_task函数的具体用法?Python wait_task怎么用?Python wait_task使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_task函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_tasks

    def test_tasks(self):
        id1 = AsyncTask('/plugins/ginger/tasks/1', self._async_op).id
        id2 = AsyncTask('/plugins/ginger/tasks/2', self._except_op).id
        id3 = AsyncTask('/plugins/ginger/tasks/3', self._intermid_op).id

        target_uri = urllib2.quote('^/plugins/ginger/tasks/*', safe="")
        filter_data = 'status=running&target_uri=%s' % target_uri
        tasks = json.loads(
            self.request('/plugins/ginger/tasks?%s' % filter_data).read()
        )
        self.assertEquals(3, len(tasks))

        tasks = json.loads(self.request('/plugins/ginger/tasks').read())
        tasks_ids = [t['id'] for t in tasks]
        self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
        wait_task(self._task_lookup, id2)
        foo2 = json.loads(
            self.request('/plugins/ginger/tasks/%s' % id2).read()
        )
        keys = ['id', 'status', 'message', 'target_uri']
        self.assertEquals(sorted(keys), sorted(foo2.keys()))
        self.assertEquals('failed', foo2['status'])
        wait_task(self._task_lookup, id3)
        foo3 = json.loads(
            self.request('/plugins/ginger/tasks/%s' % id3).read()
        )
        self.assertEquals('in progress', foo3['message'])
        self.assertEquals('running', foo3['status'])
开发者ID:kimchi-project,项目名称:ginger,代码行数:28,代码来源:test_tasks.py


示例2: add_vm

 def add_vm(name):
     # Create a VM
     req = json.dumps({'name': name,
                       'template': '/plugins/kimchi/templates/test'})
     task = json.loads(request(host, ssl_port, '/plugins/kimchi/vms',
                       req, 'POST').read())
     wait_task(model.task_lookup, task['id'])
开发者ID:Finn10111,项目名称:kimchi,代码行数:7,代码来源:test_mockmodel.py


示例3: test_vm_info

    def test_vm_info(self):
        model.templates_create({'name': u'test',
                                'cdrom': fake_iso})
        task = model.vms_create({'name': u'test-vm',
                                 'template': '/plugins/kimchi/templates/test'})
        wait_task(model.task_lookup, task['id'])
        vms = model.vms_get_list()
        self.assertEquals(2, len(vms))
        self.assertIn(u'test-vm', vms)

        keys = set(('name', 'state', 'stats', 'uuid', 'memory', 'cpu_info',
                    'screenshot', 'icon', 'graphics', 'users', 'groups',
                    'access', 'persistent'))

        stats_keys = set(('cpu_utilization', 'mem_utilization',
                          'net_throughput', 'net_throughput_peak',
                          'io_throughput', 'io_throughput_peak'))

        info = model.vm_lookup(u'test-vm')
        self.assertEquals(keys, set(info.keys()))
        self.assertEquals('shutoff', info['state'])
        self.assertEquals('test-vm', info['name'])
        self.assertEquals(get_template_default('old', 'memory'),
                          info['memory'])
        self.assertEquals(1, info['cpu_info']['vcpus'])
        self.assertEquals(1, info['cpu_info']['maxvcpus'])
        self.assertEquals('plugins/kimchi/images/icon-vm.png', info['icon'])
        self.assertEquals(stats_keys, set(info['stats'].keys()))
        self.assertEquals('vnc', info['graphics']['type'])
        self.assertEquals('127.0.0.1', info['graphics']['listen'])
开发者ID:Finn10111,项目名称:kimchi,代码行数:30,代码来源:test_mockmodel.py


示例4: test_screenshot_refresh

    def test_screenshot_refresh(self):
        # Create a VM
        req = json.dumps({'name': 'test', 'cdrom': fake_iso})
        request(host, ssl_port, '/plugins/kimchi/templates', req, 'POST')
        req = json.dumps({'name': 'test-vm',
                          'template': '/plugins/kimchi/templates/test'})
        resp = request(host, ssl_port, '/plugins/kimchi/vms', req, 'POST')
        task = json.loads(resp.read())
        wait_task(model.task_lookup, task['id'])

        # Test screenshot refresh for running vm
        request(host, ssl_port, '/plugins/kimchi/vms/test-vm/start', '{}',
                'POST')
        resp = request(host, ssl_port,
                       '/plugins/kimchi/vms/test-vm/screenshot')
        self.assertEquals(200, resp.status)
        self.assertEquals('image/png', resp.getheader('content-type'))
        resp1 = request(host, ssl_port, '/plugins/kimchi/vms/test-vm')
        rspBody = resp1.read()
        testvm_Data = json.loads(rspBody)
        screenshotURL = '/' + testvm_Data['screenshot']
        time.sleep(5)
        resp2 = request(host, ssl_port, screenshotURL)
        self.assertEquals(200, resp2.status)
        self.assertEquals(resp2.getheader('content-type'),
                          resp.getheader('content-type'))
        self.assertEquals(resp2.getheader('content-length'),
                          resp.getheader('content-length'))
        self.assertEquals(resp2.getheader('last-modified'),
                          resp.getheader('last-modified'))
开发者ID:Finn10111,项目名称:kimchi,代码行数:30,代码来源:test_mockmodel.py


示例5: test_debugreport_download

 def test_debugreport_download(self):
     req = json.dumps({'name': 'test_rest_report1'})
     with RollbackContext() as rollback:
         resp = request('/plugins/gingerbase/debugreports', req, 'POST')
         self.assertEquals(202, resp.status)
         task = json.loads(resp.read())
         # make sure the debugreport doesn't exist until the
         # the task is finished
         wait_task(self._task_lookup, task['id'], 20)
         rollback.prependDefer(self._report_delete, 'test_rest_report1')
         resp = request(
             '/plugins/gingerbase/debugreports/test_rest_report1'
         )
         debugreport = json.loads(resp.read())
         self.assertEquals("test_rest_report1", debugreport['name'])
         self.assertEquals(200, resp.status)
         resp = request(
             '/plugins/gingerbase/debugreports/test_rest_report1/content'
         )
         self.assertEquals(200, resp.status)
         resp = request(
             '/plugins/gingerbase/debugreports/test_rest_report1'
         )
         debugre = json.loads(resp.read())
         resp = request('/' + debugre['uri'])
         self.assertEquals(200, resp.status)
开发者ID:kimchi-project,项目名称:gingerbase,代码行数:26,代码来源:test_rest.py


示例6: test_vm_livemigrate_persistent_API

    def test_vm_livemigrate_persistent_API(self):
        patch_auth()

        inst = model.Model(libvirt_uri='qemu:///system',
                           objstore_loc=self.tmp_store)

        host = '127.0.0.1'
        port = get_free_port('http')
        ssl_port = get_free_port('https')
        cherrypy_port = get_free_port('cherrypy_port')

        with RollbackContext() as rollback:
            test_server = run_server(host, port, ssl_port, test_mode=True,
                                     cherrypy_port=cherrypy_port, model=inst)
            rollback.prependDefer(test_server.stop)

            self.request = partial(request, host, ssl_port)

            self.create_vm_test()
            rollback.prependDefer(rollback_wrapper, self.inst.vm_delete,
                                  u'test_vm_migrate')

            # removing cdrom because it is not shared storage and will make
            # the migration fail
            dev_list = self.inst.vmstorages_get_list('test_vm_migrate')
            self.inst.vmstorage_delete('test_vm_migrate',  dev_list[0])

            try:
                self.inst.vm_start('test_vm_migrate')
            except Exception, e:
                self.fail('Failed to start the vm, reason: %s' % e.message)

            migrate_url = "/plugins/kimchi/vms/%s/migrate" % 'test_vm_migrate'

            req = json.dumps({'remote_host': KIMCHI_LIVE_MIGRATION_TEST,
                             'user': 'root'})
            resp = self.request(migrate_url, req, 'POST')
            self.assertEquals(202, resp.status)
            task = json.loads(resp.read())
            wait_task(self._task_lookup, task['id'])
            task = json.loads(
                self.request(
                    '/plugins/kimchi/tasks/%s' % task['id'],
                    '{}'
                ).read()
            )
            self.assertEquals('finished', task['status'])

            try:
                remote_conn = self.get_remote_conn()
                rollback.prependDefer(remote_conn.close)
                remote_vm = remote_conn.lookupByName('test_vm_migrate')
                self.assertTrue(remote_vm.isPersistent())
                remote_vm.destroy()
                remote_vm.undefine()
            except Exception, e:
                self.fail('Migration test failed: %s' % e.message)
开发者ID:Finn10111,项目名称:kimchi,代码行数:57,代码来源:test_livemigration.py


示例7: test_packages_update

    def test_packages_update(self):
        def _task_lookup(taskid):
            return json.loads(self.request('/plugins/gingerbase/tasks/%s' %
                                           taskid).read())

        resp = self.request('/plugins/gingerbase/host/packagesupdate',
                            None, 'GET')
        pkgs = json.loads(resp.read())
        self.assertEquals(5, len(pkgs))

        pkg_keys = ['package_name', 'repository', 'arch', 'version', 'depends']
        for p in pkgs:
            name = p['package_name']
            resp = self.request('/plugins/gingerbase/host/packagesupdate/' +
                                name, None, 'GET')
            info = json.loads(resp.read())
            self.assertEquals(sorted(pkg_keys), sorted(info.keys()))

        # Test system update of specific package. Since package 'ginger' has
        # 'wok' as dependency, both packages must be selected to be updated
        # and, in the end of the process, we have only 3 packages to update.
        uri = '/plugins/gingerbase/host/packagesupdate/ginger/upgrade'
        resp = self.request(uri, '{}', 'POST')
        task = json.loads(resp.read())
        task_params = [u'id', u'message', u'status', u'target_uri']
        self.assertEquals(sorted(task_params), sorted(task.keys()))

        resp = self.request('/plugins/gingerbase/tasks/' + task[u'id'],
                            None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'running')
        wait_task(_task_lookup, task_info['id'])
        resp = self.request('/plugins/gingerbase/tasks/' + task[u'id'],
                            None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'finished')
        self.assertIn(u'All packages updated', task_info['message'])
        pkgs = model.packagesupdate_get_list()
        self.assertEquals(3, len(pkgs))

        # test system update of the rest of packages
        resp = self.request('/plugins/gingerbase/host/swupdate', '{}', 'POST')
        task = json.loads(resp.read())
        task_params = [u'id', u'message', u'status', u'target_uri']
        self.assertEquals(sorted(task_params), sorted(task.keys()))

        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'running')
        wait_task(_task_lookup, task_info['id'])
        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'finished')
        self.assertIn(u'All packages updated', task_info['message'])
        pkgs = model.packagesupdate_get_list()
        self.assertEquals(0, len(pkgs))
开发者ID:pvital,项目名称:gingerbase,代码行数:56,代码来源:test_host.py


示例8: test_memory_window_changes

    def test_memory_window_changes(self):
        model.templates_create({'name': u'test',
                                'source_media': fake_iso})
        task = model.vms_create({'name': u'test-vm',
                                 'template': '/plugins/kimchi/templates/test'})
        wait_task(model.task_lookup, task['id'])

        info = model.device_lookup('pci_0000_1a_00_0')
        model.vmhostdevs_update_mmio_guest(u'test-vm', True)
        model._attach_device(u'test-vm',
                             model._get_pci_device_xml(info, 0, False))
开发者ID:cclauss,项目名称:kimchi,代码行数:11,代码来源:test_mockmodel.py


示例9: test_vm_livemigrate_persistent_API

    def test_vm_livemigrate_persistent_API(self):
        patch_auth()
        with RollbackContext() as rollback:
            test_server = run_server()
            rollback.prependDefer(test_server.stop)

            self.request = partial(request)

            self.create_vm_test()
            rollback.prependDefer(
                rollback_wrapper, self.inst.vm_delete, 'test_vm_migrate'
            )

            # removing cdrom because it is not shared storage and will make
            # the migration fail
            dev_list = self.inst.vmstorages_get_list('test_vm_migrate')
            self.inst.vmstorage_delete('test_vm_migrate', dev_list[0])

            try:
                self.inst.vm_start('test_vm_migrate')
            except Exception as e:
                self.fail('Failed to start the vm, reason: %s' % e.message)

            migrate_url = '/plugins/kimchi/vms/%s/migrate' % 'test_vm_migrate'

            req = json.dumps(
                {'remote_host': KIMCHI_LIVE_MIGRATION_TEST, 'user': 'root'}
            )
            resp = self.request(migrate_url, req, 'POST')
            self.assertEqual(202, resp.status)
            task = json.loads(resp.read())
            wait_task(self._task_lookup, task['id'])
            task = json.loads(
                self.request('/plugins/kimchi/tasks/%s' %
                             task['id'], '{}').read()
            )
            self.assertEqual('finished', task['status'])

            try:
                remote_conn = self.get_remote_conn()
                rollback.prependDefer(remote_conn.close)
                remote_vm = remote_conn.lookupByName('test_vm_migrate')
                self.assertTrue(remote_vm.isPersistent())
                remote_vm.destroy()
                remote_vm.undefine()
            except Exception as e:
                self.fail('Migration test failed: %s' % e.message)
开发者ID:alinefm,项目名称:kimchi,代码行数:47,代码来源:test_livemigration.py


示例10: test_hotplug_3D_card

    def test_hotplug_3D_card(self):
        model.templates_create({'name': u'test',
                                'source_media': fake_iso})
        task = model.vms_create({'name': u'test-vm',
                                 'template': '/plugins/kimchi/templates/test'})
        wait_task(model.task_lookup, task['id'])
        model.vm_start(u'test-vm')

        # attach the 3D cards found to a running guest
        all_devices = model.devices_get_list()
        for device in all_devices:
            device_info = model.device_lookup(device)
            if model.device_is_device_3D_controller(device_info):
                try:
                    model.vmhostdevs_create(u'test-vm', {'name': device})

                # expect the error: KCHVMHDEV0006E
                except InvalidOperation as e:
                    self.assertEqual(e.message[:14], u'KCHVMHDEV0006E')
开发者ID:cclauss,项目名称:kimchi,代码行数:19,代码来源:test_mockmodel.py


示例11: test_create_debugreport

 def test_create_debugreport(self):
     req = json.dumps({'name': 'report1'})
     with RollbackContext() as rollback:
         resp = request(host, ssl_port, '/plugins/gingerbase/debugreports',
                        req, 'POST')
         self.assertEquals(202, resp.status)
         task = json.loads(resp.read())
         # make sure the debugreport doesn't exist until the
         # the task is finished
         wait_task(self._task_lookup, task['id'])
         rollback.prependDefer(self._report_delete, 'report2')
         resp = request(host, ssl_port,
                        '/plugins/gingerbase/debugreports/report1')
         debugreport = json.loads(resp.read())
         self.assertEquals("report1", debugreport['name'])
         self.assertEquals(200, resp.status)
         req = json.dumps({'name': 'report2'})
         resp = request(host, ssl_port,
                        '/plugins/gingerbase/debugreports/report1',
                        req, 'PUT')
         self.assertEquals(303, resp.status)
开发者ID:Clevero,项目名称:gingerbase,代码行数:21,代码来源:test_rest.py


示例12: test_host_actions

    def test_host_actions(self):
        def _task_lookup(taskid):
            return json.loads(self.request('/plugins/gingerbase/tasks/%s' %
                                           taskid).read())

        resp = self.request('/plugins/gingerbase/host/shutdown', '{}', 'POST')
        self.assertEquals(200, resp.status)
        resp = self.request('/plugins/gingerbase/host/reboot', '{}', 'POST')
        self.assertEquals(200, resp.status)

        # Test system update
        resp = self.request('/plugins/gingerbase/host/packagesupdate',
                            None, 'GET')
        pkgs = json.loads(resp.read())
        self.assertEquals(3, len(pkgs))

        pkg_keys = ['package_name', 'repository', 'arch', 'version']
        for p in pkgs:
            name = p['package_name']
            resp = self.request('/plugins/gingerbase/host/packagesupdate/' +
                                name, None, 'GET')
            info = json.loads(resp.read())
            self.assertEquals(sorted(pkg_keys), sorted(info.keys()))

        resp = self.request('/plugins/gingerbase/host/swupdate', '{}', 'POST')
        task = json.loads(resp.read())
        task_params = [u'id', u'message', u'status', u'target_uri']
        self.assertEquals(sorted(task_params), sorted(task.keys()))

        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'running')
        wait_task(_task_lookup, task_info['id'])
        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
        task_info = json.loads(resp.read())
        self.assertEquals(task_info['status'], 'finished')
        self.assertIn(u'All packages updated', task_info['message'])
        pkgs = model.packagesupdate_get_list()
        self.assertEquals(0, len(pkgs))
开发者ID:frediz,项目名称:gingerbase,代码行数:39,代码来源:test_host.py


示例13: _do_volume_test

def _do_volume_test(self, model, host, ssl_port, pool_name):
    def _task_lookup(taskid):
        return json.loads(
            self.request('/plugins/kimchi/tasks/%s' % taskid).read()
        )

    uri = '/plugins/kimchi/storagepools/%s/storagevolumes' \
          % pool_name.encode('utf-8')
    resp = self.request(uri)
    self.assertEquals(200, resp.status)

    resp = self.request('/plugins/kimchi/storagepools/%s' %
                        pool_name.encode('utf-8'))
    pool_info = json.loads(resp.read())
    with RollbackContext() as rollback:
        # Create storage volume with 'capacity'
        vol = 'test-volume'
        vol_uri = uri + '/' + vol
        req = json.dumps({'name': vol, 'format': 'raw',
                          'capacity': 1073741824})  # 1 GiB
        resp = self.request(uri, req, 'POST')
        if pool_info['type'] in READONLY_POOL_TYPE:
            self.assertEquals(400, resp.status)
        else:
            rollback.prependDefer(rollback_wrapper, model.storagevolume_delete,
                                  pool_name, vol)
            self.assertEquals(202, resp.status)
            task_id = json.loads(resp.read())['id']
            wait_task(_task_lookup, task_id)
            status = json.loads(
                self.request('/plugins/kimchi/tasks/%s' % task_id).read()
            )
            self.assertEquals('finished', status['status'])
            vol_info = json.loads(self.request(vol_uri).read())
            vol_info['name'] = vol
            vol_info['format'] = 'raw'
            vol_info['capacity'] = 1073741824

            # Resize the storage volume: increase its capacity to 2 GiB
            req = json.dumps({'size': 2147483648})  # 2 GiB
            resp = self.request(vol_uri + '/resize', req, 'POST')
            self.assertEquals(200, resp.status)
            storagevolume = json.loads(self.request(vol_uri).read())
            self.assertEquals(2147483648, storagevolume['capacity'])

            # Resize the storage volume: decrease its capacity to 512 MiB
            # This test case may fail if libvirt does not include the fix for
            # https://bugzilla.redhat.com/show_bug.cgi?id=1021802
            req = json.dumps({'size': 536870912})  # 512 MiB
            resp = self.request(vol_uri + '/resize', req, 'POST')
            self.assertEquals(200, resp.status)
            storagevolume = json.loads(self.request(vol_uri).read())
            self.assertEquals(536870912, storagevolume['capacity'])

            # Wipe the storage volume
            resp = self.request(vol_uri + '/wipe', '{}', 'POST')
            self.assertEquals(200, resp.status)
            storagevolume = json.loads(self.request(vol_uri).read())
            self.assertEquals(0, storagevolume['allocation'])

            # Clone the storage volume
            vol_info = json.loads(self.request(vol_uri).read())
            resp = self.request(vol_uri + '/clone', '{}', 'POST')
            self.assertEquals(202, resp.status)
            task = json.loads(resp.read())
            cloned_vol_name = task['target_uri'].split('/')[-2]
            rollback.prependDefer(model.storagevolume_delete, pool_name,
                                  cloned_vol_name)
            wait_task(_task_lookup, task['id'])
            task = json.loads(
                self.request('/plugins/kimchi/tasks/%s' % task['id']).read()
            )
            self.assertEquals('finished', task['status'])
            resp = self.request(uri + '/' + cloned_vol_name.encode('utf-8'))

            self.assertEquals(200, resp.status)
            cloned_vol = json.loads(resp.read())

            self.assertNotEquals(vol_info['name'], cloned_vol['name'])
            self.assertNotEquals(vol_info['path'], cloned_vol['path'])
            for key in ['name', 'path', 'allocation']:
                del vol_info[key]
                del cloned_vol[key]

            self.assertEquals(vol_info, cloned_vol)

            # Delete the storage volume
            resp = self.request(vol_uri, '{}', 'DELETE')
            self.assertEquals(204, resp.status)
            resp = self.request(vol_uri)
            self.assertEquals(404, resp.status)

        # Storage volume upload
        # It is done through a sequence of POST and several PUT requests
        filename = 'COPYING.LGPL'
        filepath = os.path.join(paths.get_prefix(), filename)
        filesize = os.stat(filepath).st_size

        # Create storage volume for upload
        req = json.dumps({'name': filename, 'format': 'raw',
#.........这里部分代码省略.........
开发者ID:Pojen-Huang,项目名称:kimchi,代码行数:101,代码来源:test_model_storagevolume.py


示例14: test_nonroot_access

    def test_nonroot_access(self, validate_users):
        validate_users.return_value = True

        # Non-root users can not create or delete network (only get)
        resp = self.request('/plugins/kimchi/networks', '{}', 'GET')
        self.assertEqual(200, resp.status)
        resp = self.request('/plugins/kimchi/networks', '{}', 'POST')
        self.assertEqual(403, resp.status)
        resp = self.request(
            '/plugins/kimchi/networks/default/activate', '{}', 'POST')
        self.assertEqual(403, resp.status)
        resp = self.request('/plugins/kimchi/networks/default', '{}', 'DELETE')
        self.assertEqual(403, resp.status)

        # Non-root users can not create or delete storage pool (only get)
        resp = self.request('/plugins/kimchi/storagepools', '{}', 'GET')
        self.assertEqual(200, resp.status)
        resp = self.request('/plugins/kimchi/storagepools', '{}', 'POST')
        self.assertEqual(403, resp.status)
        resp = self.request(
            '/plugins/kimchi/storagepools/default/activate', '{}', 'POST'
        )
        self.assertEqual(403, resp.status)
        resp = self.request(
            '/plugins/kimchi/storagepools/default', '{}', 'DELETE')
        self.assertEqual(403, resp.status)

        # Non-root users can not update or delete a template
        # but he can get and create a new one
        resp = self.request('/plugins/kimchi/templates', '{}', 'GET')
        self.assertEqual(403, resp.status)
        req = json.dumps({'name': 'test', 'source_media': fake_iso})
        resp = self.request('/plugins/kimchi/templates', req, 'POST')
        self.assertEqual(403, resp.status)
        resp = self.request('/plugins/kimchi/templates/test', '{}', 'PUT')
        self.assertEqual(403, resp.status)
        resp = self.request('/plugins/kimchi/templates/test', '{}', 'DELETE')
        self.assertEqual(403, resp.status)

        # Non-root users can only get vms authorized to them
        model.templates_create(
            {'name': u'test', 'source_media': {'type': 'disk', 'path': fake_iso}}
        )

        task_info = model.vms_create(
            {'name': u'test-me', 'template': '/plugins/kimchi/templates/test'}
        )
        wait_task(model.task_lookup, task_info['id'])

        model.vm_update(u'test-me', {'users': ['user'], 'groups': []})

        task_info = model.vms_create(
            {'name': u'test-usera', 'template': '/plugins/kimchi/templates/test'}
        )
        wait_task(model.task_lookup, task_info['id'])

        non_root = list(set(model.users_get_list()) - set(['admin']))[0]
        model.vm_update(u'test-usera', {'users': [non_root], 'groups': []})

        task_info = model.vms_create(
            {'name': u'test-groupa', 'template': '/plugins/kimchi/templates/test'}
        )
        wait_task(model.task_lookup, task_info['id'])
        a_group = model.groups_get_list()[0]
        model.vm_update(u'test-groupa', {'groups': [a_group]})

        resp = self.request('/plugins/kimchi/vms', '{}', 'GET')
        self.assertEqual(200, resp.status)
        vms_data = json.loads(resp.read())
        self.assertEqual(
            [u'test-groupa', u'test-me'], sorted([v['name'] for v in vms_data])
        )
        resp = self.request('/plugins/kimchi/vms', req, 'POST')
        self.assertEqual(403, resp.status)

        # Create a vm using mockmodel directly to test Resource access
        task_info = model.vms_create(
            {'name': 'kimchi-test', 'template': '/plugins/kimchi/templates/test'}
        )
        wait_task(model.task_lookup, task_info['id'])
        resp = self.request('/plugins/kimchi/vms/kimchi-test', '{}', 'PUT')
        self.assertEqual(403, resp.status)
        resp = self.request('/plugins/kimchi/vms/kimchi-test', '{}', 'DELETE')
        self.assertEqual(403, resp.status)

        # Non-root users can only update VMs authorized by them
        resp = self.request('/plugins/kimchi/vms/test-me/start', '{}', 'POST')
        self.assertEqual(200, resp.status)
        resp = self.request(
            '/plugins/kimchi/vms/test-usera/start', '{}', 'POST')
        self.assertEqual(403, resp.status)

        model.template_delete('test')
        model.vm_delete('test-me')
开发者ID:alinefm,项目名称:kimchi,代码行数:94,代码来源:test_authorization.py


示例15: test_nonroot_access

    def test_nonroot_access(self):
        # Non-root users can not create or delete network (only get)
        resp = self.request("/plugins/kimchi/networks", "{}", "GET")
        self.assertEquals(200, resp.status)
        resp = self.request("/plugins/kimchi/networks", "{}", "POST")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/networks/default/activate", "{}", "POST")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/networks/default", "{}", "DELETE")
        self.assertEquals(403, resp.status)

        # Non-root users can not create or delete storage pool (only get)
        resp = self.request("/plugins/kimchi/storagepools", "{}", "GET")
        self.assertEquals(200, resp.status)
        resp = self.request("/plugins/kimchi/storagepools", "{}", "POST")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/storagepools/default/activate", "{}", "POST")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/storagepools/default", "{}", "DELETE")
        self.assertEquals(403, resp.status)

        # Non-root users can not update or delete a template
        # but he can get and create a new one
        resp = self.request("/plugins/kimchi/templates", "{}", "GET")
        self.assertEquals(403, resp.status)
        req = json.dumps({"name": "test", "source_media": fake_iso})
        resp = self.request("/plugins/kimchi/templates", req, "POST")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/templates/test", "{}", "PUT")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/templates/test", "{}", "DELETE")
        self.assertEquals(403, resp.status)

        # Non-root users can only get vms authorized to them
        model.templates_create({"name": u"test", "source_media": {"type": "disk", "path": fake_iso}})

        task_info = model.vms_create({"name": u"test-me", "template": "/plugins/kimchi/templates/test"})
        wait_task(model.task_lookup, task_info["id"])

        fake_user = get_fake_user()

        model.vm_update(u"test-me", {"users": [fake_user.keys()[0]], "groups": []})

        task_info = model.vms_create({"name": u"test-usera", "template": "/plugins/kimchi/templates/test"})
        wait_task(model.task_lookup, task_info["id"])

        non_root = list(set(model.users_get_list()) - set(["root"]))[0]
        model.vm_update(u"test-usera", {"users": [non_root], "groups": []})

        task_info = model.vms_create({"name": u"test-groupa", "template": "/plugins/kimchi/templates/test"})
        wait_task(model.task_lookup, task_info["id"])
        a_group = model.groups_get_list()[0]
        model.vm_update(u"test-groupa", {"groups": [a_group]})

        resp = self.request("/plugins/kimchi/vms", "{}", "GET")
        self.assertEquals(200, resp.status)
        vms_data = json.loads(resp.read())
        self.assertEquals([u"test-groupa", u"test-me"], sorted([v["name"] for v in vms_data]))
        resp = self.request("/plugins/kimchi/vms", req, "POST")
        self.assertEquals(403, resp.status)

        # Create a vm using mockmodel directly to test Resource access
        task_info = model.vms_create({"name": "kimchi-test", "template": "/plugins/kimchi/templates/test"})
        wait_task(model.task_lookup, task_info["id"])
        resp = self.request("/plugins/kimchi/vms/kimchi-test", "{}", "PUT")
        self.assertEquals(403, resp.status)
        resp = self.request("/plugins/kimchi/vms/kimchi-test", "{}", "DELETE")
        self.assertEquals(403, resp.status)

        # Non-root users can only update VMs authorized by them
        resp = self.request("/plugins/kimchi/vms/test-me/start", "{}", "POST")
        self.assertEquals(200, resp.status)
        resp = self.request("/plugins/kimchi/vms/test-usera/start", "{}", "POST")
        self.assertEquals(403, resp.status)

        model.template_delete("test")
        model.vm_delete("test-me")
开发者ID:kimchi-project,项目名称:kimchi,代码行数:77,代码来源:test_authorization.py



注:本文中的tests.utils.wait_task函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.wrap函数代码示例发布时间:2022-05-27
下一篇:
Python utils.test_create函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap