• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test.call_until_true函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempest.test.call_until_true函数的典型用法代码示例。如果您正苦于以下问题:Python call_until_true函数的具体用法?Python call_until_true怎么用?Python call_until_true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了call_until_true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _hotplug_server

    def _hotplug_server(self):
        old_floating_ip, server = self.floating_ip_tuple
        ip_address = old_floating_ip.floating_ip_address
        private_key = self.servers[server].private_key
        ssh_client = self.get_remote_client(ip_address, private_key=private_key)
        old_nic_list = self._get_server_nics(ssh_client)
        # get a port from a list of one item
        port_list = self._list_ports(device_id=server.id)
        self.assertEqual(1, len(port_list))
        old_port = port_list[0]
        self.compute_client.servers.interface_attach(server=server, net_id=self.new_net.id, port_id=None, fixed_ip=None)
        # move server to the head of the cleanup list
        self.addCleanup(self.cleanup_wrapper, server)

        def check_ports():
            port_list = [port for port in self._list_ports(device_id=server.id) if port != old_port]
            return len(port_list) == 1

        test.call_until_true(check_ports, 60, 1)
        new_port_list = [p for p in self._list_ports(device_id=server.id) if p != old_port]
        self.assertEqual(1, len(new_port_list))
        new_port = new_port_list[0]
        new_port = net_common.DeletablePort(client=self.network_client, **new_port)
        new_nic_list = self._get_server_nics(ssh_client)
        diff_list = [n for n in new_nic_list if n not in old_nic_list]
        self.assertEqual(1, len(diff_list))
        num, new_nic = diff_list[0]
        ssh_client.assign_static_ip(nic=new_nic, addr=new_port.fixed_ips[0]["ip_address"])
        ssh_client.turn_nic_on(nic=new_nic)
开发者ID:nttdata-osscloud,项目名称:tempest,代码行数:29,代码来源:test_network_basic_ops.py


示例2: test_execute_dummy_action_plan

    def test_execute_dummy_action_plan(self):
        _, audit_template = self.create_audit_template()
        _, audit = self.create_audit(audit_template['uuid'])

        self.assertTrue(test.call_until_true(
            func=functools.partial(self.has_audit_succeeded, audit['uuid']),
            duration=30,
            sleep_for=.5
        ))
        _, action_plans = self.client.list_action_plans(
            audit_uuid=audit['uuid'])
        action_plan = action_plans['action_plans'][0]

        _, action_plan = self.client.show_action_plan(action_plan['uuid'])

        # Execute the action by changing its state to PENDING
        _, updated_ap = self.client.update_action_plan(
            action_plan['uuid'],
            patch=[{'path': '/state', 'op': 'replace', 'value': 'PENDING'}]
        )

        self.assertTrue(test.call_until_true(
            func=functools.partial(
                self.has_action_plan_finished, action_plan['uuid']),
            duration=30,
            sleep_for=.5
        ))
        _, finished_ap = self.client.show_action_plan(action_plan['uuid'])

        self.assertIn(updated_ap['state'], ('PENDING', 'ONGOING'))
        self.assertEqual('SUCCEEDED', finished_ap['state'])
开发者ID:Jean-Emile,项目名称:watcher,代码行数:31,代码来源:test_action_plan.py


示例3: check_flip_status

    def check_flip_status(self, floating_ip, status):
        """Verifies floatingip reaches the given status

        :param dict floating_ip: floating IP dict to check status
        :param status: target status
        :raises: AssertionError if status doesn't match
        """

        # TODO(ptoohill): Find a way to utilze the proper client method

        floatingip_id = floating_ip['id']

        def refresh():
            result = (self.floating_ips_client_admin.
                      show_floatingip(floatingip_id)['floatingip'])
            return status == result['status']

        test.call_until_true(refresh, 100, 1)

        floating_ip = self.floating_ips_client_admin.show_floatingip(
            floatingip_id)['floatingip']
        self.assertEqual(status, floating_ip['status'],
                         message="FloatingIP: {fp} is at status: {cst}. "
                                 "failed  to reach status: {st}"
                         .format(fp=floating_ip, cst=floating_ip['status'],
                                 st=status))
        LOG.info("FloatingIP: {fp} is at status: {st}"
                 .format(fp=floating_ip, st=status))
开发者ID:sebrandon1,项目名称:octavia,代码行数:28,代码来源:base.py


示例4: assertScale

 def assertScale(from_servers, to_servers):
     call_until_true(lambda: server_count() == to_servers,
                     timeout, interval)
     self.assertEqual(to_servers, self.server_count,
                      'Failed scaling from %d to %d servers. '
                      'Current server count: %s' % (
                          from_servers, to_servers,
                          self.server_count))
开发者ID:LIS,项目名称:LIS-Tempest,代码行数:8,代码来源:test_autoscaling.py


示例5: _prepare_and_test

    def _prepare_and_test(self, address6_mode, n_subnets6=1, dualnet=False):
        net_list = self.prepare_network(address6_mode=address6_mode,
                                        n_subnets6=n_subnets6,
                                        dualnet=dualnet)

        sshv4_1, ips_from_api_1, sid1 = self.prepare_server(networks=net_list)
        sshv4_2, ips_from_api_2, sid2 = self.prepare_server(networks=net_list)

        def guest_has_address(ssh, addr):
            return addr in ssh.get_ip_list()

        # Turn on 2nd NIC for Cirros when dualnet
        if dualnet:
            self.turn_nic6_on(sshv4_1, sid1)
            self.turn_nic6_on(sshv4_2, sid2)

        # get addresses assigned to vNIC as reported by 'ip address' utility
        ips_from_ip_1 = sshv4_1.get_ip_list()
        ips_from_ip_2 = sshv4_2.get_ip_list()
        self.assertIn(ips_from_api_1['4'], ips_from_ip_1)
        self.assertIn(ips_from_api_2['4'], ips_from_ip_2)
        for i in range(n_subnets6):
            # v6 should be configured since the image supports it
            # It can take time for ipv6 automatic address to get assigned
            srv1_v6_addr_assigned = functools.partial(
                guest_has_address, sshv4_1, ips_from_api_1['6'][i])

            srv2_v6_addr_assigned = functools.partial(
                guest_has_address, sshv4_2, ips_from_api_2['6'][i])

            self.assertTrue(test.call_until_true(srv1_v6_addr_assigned,
                                                 CONF.compute.ping_timeout, 1))

            self.assertTrue(test.call_until_true(srv2_v6_addr_assigned,
                                                 CONF.compute.ping_timeout, 1))

        self._check_connectivity(sshv4_1, ips_from_api_2['4'])
        self._check_connectivity(sshv4_2, ips_from_api_1['4'])

        # Some VM (like cirros) may not have ping6 utility
        result = sshv4_1.exec_command('whereis ping6')
        is_ping6 = False if result == 'ping6:\n' else True
        if is_ping6:
            for i in range(n_subnets6):
                self._check_connectivity(sshv4_1,
                                         ips_from_api_2['6'][i])
                self._check_connectivity(sshv4_1,
                                         self.subnets_v6[i].gateway_ip)
                self._check_connectivity(sshv4_2,
                                         ips_from_api_1['6'][i])
                self._check_connectivity(sshv4_2,
                                         self.subnets_v6[i].gateway_ip)
        else:
            LOG.warning('Ping6 is not available, skipping')
开发者ID:flyingfish007,项目名称:tempest,代码行数:54,代码来源:test_network_v6.py


示例6: call_until_valid

 def call_until_valid(self, func, duration, *args, **kwargs):
     # Call until get valid response for "duration"
     # because tenant usage doesn't become available immediately
     # after create VM.
     def is_valid():
         try:
             self.resp = func(*args, **kwargs)
             return True
         except e.InvalidHTTPResponseBody:
             return False
     test.call_until_true(is_valid, duration, 1)
     return self.resp
开发者ID:jishandong,项目名称:tempest,代码行数:12,代码来源:test_simple_tenant_usage.py


示例7: test_datasource_db_sync_remove

    def test_datasource_db_sync_remove(self):
        # Verify that a replica removes a datasource when a datasource
        # disappears from the database.
        CLIENT2_PORT = 4001
        client1 = self.admin_manager.congress_client
        fake_id = self.create_fake(client1)
        need_to_delete_fake = True
        try:
            self.start_replica(CLIENT2_PORT)

            # Verify that primary server has fake datasource
            if not test.call_until_true(
                    func=lambda: self.datasource_exists(client1, fake_id),
                    duration=60, sleep_for=1):
                raise exceptions.TimeoutException(
                    "primary should have fake, but does not")

            # Create session for second server.
            client2 = self.create_client(CLIENT2_PORT)

            # Verify that second server has fake datasource
            if not test.call_until_true(
                    func=lambda: self.datasource_exists(client2, fake_id),
                    duration=60, sleep_for=1):
                raise exceptions.TimeoutException(
                    "replica should have fake, but does not")

            # Remove fake from primary server instance.
            LOG.debug("removing fake datasource %s", str(fake_id))
            client1.delete_datasource(fake_id)
            need_to_delete_fake = False

            # Confirm that fake is gone from primary server instance.
            if not test.call_until_true(
                    func=lambda: self.datasource_missing(client1, fake_id),
                    duration=60, sleep_for=1):
                self.stop_replica(CLIENT2_PORT)
                raise exceptions.TimeoutException(
                    "primary instance still has fake")
            LOG.debug("removed fake datasource from primary instance")

            # Confirm that second service instance removes fake.
            if not test.call_until_true(
                    func=lambda: self.datasource_missing(client2, fake_id),
                    duration=60, sleep_for=1):
                raise exceptions.TimeoutException(
                    "replica should remove fake, but still has it")

        finally:
            self.stop_replica(CLIENT2_PORT)
            if need_to_delete_fake:
                self.admin_manager.congress_client.delete_datasource(fake_id)
开发者ID:Cindia-blue,项目名称:congress,代码行数:52,代码来源:test_ha.py


示例8: _hotplug_server

    def _hotplug_server(self):
        old_floating_ip, server = self.floating_ip_tuple
        ip_address = old_floating_ip.floating_ip_address
        private_key = self._get_server_key(server)
        ssh_client = self.get_remote_client(ip_address,
                                            private_key=private_key)
        old_nic_list = self._get_server_nics(ssh_client)
        # get a port from a list of one item
        port_list = self._list_ports(device_id=server['id'])
        self.assertEqual(1, len(port_list))
        old_port = port_list[0]
        interface = self.interface_client.create_interface(
            server_id=server['id'],
            net_id=self.new_net.id)['interfaceAttachment']
        self.addCleanup(self.network_client.wait_for_resource_deletion,
                        'port',
                        interface['port_id'], client=self.ports_client)
        self.addCleanup(self.delete_wrapper,
                        self.interface_client.delete_interface,
                        server['id'], interface['port_id'])

        def check_ports():
            self.new_port_list = [port for port in
                                  self._list_ports(device_id=server['id'])
                                  if port['id'] != old_port['id']]
            return len(self.new_port_list) == 1

        if not test.call_until_true(check_ports, CONF.network.build_timeout,
                                    CONF.network.build_interval):
            raise exceptions.TimeoutException(
                "No new port attached to the server in time (%s sec)! "
                "Old port: %s. Number of new ports: %d" % (
                    CONF.network.build_timeout, old_port,
                    len(self.new_port_list)))
        new_port = net_resources.DeletablePort(ports_client=self.ports_client,
                                               **self.new_port_list[0])

        def check_new_nic():
            new_nic_list = self._get_server_nics(ssh_client)
            self.diff_list = [n for n in new_nic_list if n not in old_nic_list]
            return len(self.diff_list) == 1

        if not test.call_until_true(check_new_nic, CONF.network.build_timeout,
                                    CONF.network.build_interval):
            raise exceptions.TimeoutException("Interface not visible on the "
                                              "guest after %s sec"
                                              % CONF.network.build_timeout)

        num, new_nic = self.diff_list[0]
        ssh_client.assign_static_ip(nic=new_nic,
                                    addr=new_port.fixed_ips[0]['ip_address'])
        ssh_client.turn_nic_on(nic=new_nic)
开发者ID:koshi-m,项目名称:tempest,代码行数:52,代码来源:test_network_basic_ops.py


示例9: _hotplug_server

    def _hotplug_server(self):
        old_floating_ip, server = self.floating_ip_tuple
        ip_address = old_floating_ip.floating_ip_address
        private_key = self.servers[server].private_key
        ssh_client = self.get_remote_client(ip_address,
                                            private_key=private_key)
        old_nic_list = self._get_server_nics(ssh_client)
        # get a port from a list of one item
        port_list = self._list_ports(device_id=server.id)
        self.assertEqual(1, len(port_list))
        old_port = port_list[0]
        self.compute_client.servers.interface_attach(server=server,
                                                     net_id=self.new_net.id,
                                                     port_id=None,
                                                     fixed_ip=None)
        # move server to the head of the cleanup list
        self.addCleanup(self.delete_timeout,
                        self.compute_client.servers,
                        server.id)
        self.addCleanup(self.delete_wrapper, server)

        def check_ports():
            self.new_port_list = [port for port in
                                  self._list_ports(device_id=server.id)
                                  if port != old_port]
            return len(self.new_port_list) == 1

        if not test.call_until_true(check_ports, CONF.network.build_timeout,
                                    CONF.network.build_interval):
            raise exceptions.TimeoutException("No new port attached to the "
                                              "server in time (%s sec) !"
                                              % CONF.network.build_timeout)
        new_port = net_common.DeletablePort(client=self.network_client,
                                            **self.new_port_list[0])

        def check_new_nic():
            new_nic_list = self._get_server_nics(ssh_client)
            self.diff_list = [n for n in new_nic_list if n not in old_nic_list]
            return len(self.diff_list) == 1

        if not test.call_until_true(check_new_nic, CONF.network.build_timeout,
                                    CONF.network.build_interval):
            raise exceptions.TimeoutException("Interface not visible on the "
                                              "guest after %s sec"
                                              % CONF.network.build_timeout)

        num, new_nic = self.diff_list[0]
        ssh_client.assign_static_ip(nic=new_nic,
                                    addr=new_port.fixed_ips[0]['ip_address'])
        ssh_client.turn_nic_on(nic=new_nic)
开发者ID:higuchi-yuuki,项目名称:tempest,代码行数:50,代码来源:test_network_basic_ops.py


示例10: test_execute_dummy_action_plan

    def test_execute_dummy_action_plan(self):
        """Execute an action plan based on the 'dummy' strategy

        - create an audit template with the 'dummy' strategy
        - run the audit to create an action plan
        - get the action plan
        - run the action plan
        - get results and make sure it succeeded
        """
        _, goal = self.client.show_goal("dummy")
        _, audit_template = self.create_audit_template(goal['uuid'])
        _, audit = self.create_audit(audit_template['uuid'])

        self.assertTrue(test.call_until_true(
            func=functools.partial(self.has_audit_succeeded, audit['uuid']),
            duration=30,
            sleep_for=.5
        ))
        _, action_plans = self.client.list_action_plans(
            audit_uuid=audit['uuid'])
        action_plan = action_plans['action_plans'][0]

        _, action_plan = self.client.show_action_plan(action_plan['uuid'])

        # Execute the action by changing its state to PENDING
        _, updated_ap = self.client.update_action_plan(
            action_plan['uuid'],
            patch=[{'path': '/state', 'op': 'replace', 'value': 'PENDING'}]
        )

        self.assertTrue(test.call_until_true(
            func=functools.partial(
                self.has_action_plan_finished, action_plan['uuid']),
            duration=30,
            sleep_for=.5
        ))
        _, finished_ap = self.client.show_action_plan(action_plan['uuid'])
        _, action_list = self.client.list_actions(
            action_plan_uuid=finished_ap["uuid"])

        action_counter = collections.Counter(
            act['action_type'] for act in action_list['actions'])

        self.assertIn(updated_ap['state'], ('PENDING', 'ONGOING'))
        self.assertEqual('SUCCEEDED', finished_ap['state'])

        # A dummy strategy generates 2 "nop" actions and 1 "sleep" action
        self.assertEqual(3, len(action_list['actions']))
        self.assertEqual(2, action_counter.get("nop"))
        self.assertEqual(1, action_counter.get("sleep"))
开发者ID:Oliverlyn,项目名称:watcher,代码行数:50,代码来源:test_execute_dummy_optim.py


示例11: _prepare_and_test

    def _prepare_and_test(self, address6_mode, n_subnets6=1, dualnet=False):
        net_list = self.prepare_network(address6_mode=address6_mode,
                                        n_subnets6=n_subnets6,
                                        dualnet=dualnet)

        sshv4_1, ips_from_api_1, sid1 = self.prepare_server(networks=net_list)
        sshv4_2, ips_from_api_2, sid2 = self.prepare_server(networks=net_list)

        def guest_has_address(ssh, addr):
            return addr in ssh.get_ip_list()

        # Turn on 2nd NIC for Cirros when dualnet
        if dualnet:
            self.turn_nic6_on(sshv4_1, sid1)
            self.turn_nic6_on(sshv4_2, sid2)

        # get addresses assigned to vNIC as reported by 'ip address' utility
        ips_from_ip_1 = sshv4_1.get_ip_list()
        ips_from_ip_2 = sshv4_2.get_ip_list()
        self.assertIn(ips_from_api_1['4'], ips_from_ip_1)
        self.assertIn(ips_from_api_2['4'], ips_from_ip_2)
        for i in range(n_subnets6):
            # v6 should be configured since the image supports it
            # It can take time for ipv6 automatic address to get assigned
            srv1_v6_addr_assigned = functools.partial(
                guest_has_address, sshv4_1, ips_from_api_1['6'][i])

            srv2_v6_addr_assigned = functools.partial(
                guest_has_address, sshv4_2, ips_from_api_2['6'][i])

            self.assertTrue(test.call_until_true(srv1_v6_addr_assigned,
                            CONF.validation.ping_timeout, 1))

            self.assertTrue(test.call_until_true(srv2_v6_addr_assigned,
                            CONF.validation.ping_timeout, 1))

        self._check_connectivity(sshv4_1, ips_from_api_2['4'])
        self._check_connectivity(sshv4_2, ips_from_api_1['4'])

        for i in range(n_subnets6):
            self._check_connectivity(sshv4_1,
                                     ips_from_api_2['6'][i])
            self._check_connectivity(sshv4_1,
                                     self.subnets_v6[i].gateway_ip)
            self._check_connectivity(sshv4_2,
                                     ips_from_api_1['6'][i])
            self._check_connectivity(sshv4_2,
                                     self.subnets_v6[i].gateway_ip)
开发者ID:varapreddy,项目名称:tempest,代码行数:48,代码来源:test_network_v6.py


示例12: test_trust_subscription

    def test_trust_subscription(self):
        sub_queue = data_utils.rand_name('Queues-Test')
        self.addCleanup(self.client.delete_queue, sub_queue)
        subscriber = 'trust+{0}/{1}/queues/{2}/messages'.format(
            self.client.base_url, self.client.uri_prefix, sub_queue)
        post_body = json.dumps(
            {'messages': [{'body': '$zaqar_message$', 'ttl': 60}]})
        post_headers = {'X-Project-ID': self.client.tenant_id,
                        'Client-ID': str(uuid.uuid4())}
        sub_body = {'ttl': 1200, 'subscriber': subscriber,
                    'options': {'post_data': post_body,
                                'post_headers': post_headers}}

        self.create_subscription(queue_name=self.queue_name, rbody=sub_body)
        message_body = self.generate_message_body()
        self.post_messages(queue_name=self.queue_name, rbody=message_body)

        if not test.call_until_true(
                lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1):
            self.fail("Couldn't get messages")
        _, body = self.list_messages(sub_queue)
        expected = message_body['messages'][0]
        expected['queue_name'] = self.queue_name
        expected['Message_Type'] = 'Notification'
        for message in body['messages']:
            # There are two message in the queue. One is the confirm message,
            # the other one is the notification.
            if message['body']['Message_Type'] == 'Notification':
                self.assertEqual(expected, message['body'])
开发者ID:ollie314,项目名称:zaqar,代码行数:29,代码来源:test_subscriptions.py


示例13: test_cinder_volumes_table

    def test_cinder_volumes_table(self):
        volume_schema = self.admin_manager.congress_client.show_datasource_table_schema(self.datasource_id, "volumes")[
            "columns"
        ]
        volume_id_col = next(i for i, c in enumerate(volume_schema) if c["name"] == "id")

        def _check_data_table_cinder_volumes():
            # Fetch data from cinder each time, because this test may start
            # before cinder has all the users.
            volumes = self.cinder.list_volumes()
            volumes_map = {}
            for volume in volumes:
                volumes_map[volume["id"]] = volume

            results = self.admin_manager.congress_client.list_datasource_rows(self.datasource_id, "volumes")
            for row in results["results"]:
                try:
                    volume_row = volumes_map[row["data"][volume_id_col]]
                except KeyError:
                    return False
                for index in range(len(volume_schema)):
                    if str(row["data"][index]) != str(volume_row[volume_schema[index]["name"]]):
                        return False
            return True

        if not test.call_until_true(func=_check_data_table_cinder_volumes, duration=100, sleep_for=5):
            raise exceptions.TimeoutException("Data did not converge in time " "or failure in server")
开发者ID:ekcs,项目名称:congress,代码行数:27,代码来源:test_cinder.py


示例14: test_nova_datasource_driver_servers

    def test_nova_datasource_driver_servers(self):
        self._setup_network_and_servers()

        def _check_data_table_nova_servers():
            results = \
                self.admin_manager.congress_policy_client.list_datasource_rows(
                    'nova', 'servers')
            keys = ['id', 'name', 'hostId', 'status', 'tenant_id',
                    'user_id', 'image', 'flavor']
            for row in results['results']:
                match = True
                for index in range(len(keys)):
                    if keys[index] in ['image', 'flavor']:
                        val = self.servers[0][keys[index]]['id']
                    else:
                        val = self.servers[0][keys[index]]

                    if row['data'][index] != val:
                        match = False
                        break
                if match:
                    return True
            return False

        if not test.call_until_true(func=_check_data_table_nova_servers,
                                    duration=20, sleep_for=4):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:alex-docker,项目名称:congress,代码行数:28,代码来源:test_nova.py


示例15: test_nova_datasource_driver_flavors

    def test_nova_datasource_driver_flavors(self):
        _, flavors = self.flavors_client.list_flavors_with_detail()
        flavor_id_map = {}
        for flavor in flavors:
            flavor_id_map[flavor['id']] = flavor

        def _check_data_table_nova_flavors():
            results = \
                self.admin_manager.congress_policy_client.list_datasource_rows(
                    'nova', 'flavors')
            keys = ['id', 'name', 'vcpus', 'ram', 'disk',
                    'OS-FLV-EXT-DATA:ephemeral', 'rxtx_factor']
            for row in results['results']:
                match = True
                flavor_row = flavor_id_map[row['data'][0]]
                for index in range(len(keys)):
                    if row['data'][index] != flavor_row[keys[index]]:
                        match = False
                        break
                if match:
                    return True
            return False

        if not test.call_until_true(func=_check_data_table_nova_flavors,
                                    duration=20, sleep_for=4):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:alex-docker,项目名称:congress,代码行数:27,代码来源:test_nova.py


示例16: test_glancev2_tags_table

    def test_glancev2_tags_table(self):
        def _check_data_table_glance_images():
            # Fetch data from glance each time, because this test may start
            # before glance has all the users.
            images = self.glancev2.list_images()['images']
            image_tag_map = {}
            for image in images:
                image_tag_map[image['id']] = image['tags']

            results = (
                self.admin_manager.congress_client.list_datasource_rows(
                    self.datasource_id, 'tags'))
            for row in results['results']:
                image_id, tag = row['data'][0], row['data'][1]
                glance_image_tags = image_tag_map.get(image_id)
                if not glance_image_tags:
                    # congress had image that glance doesn't know about.
                    return False
                if tag not in glance_image_tags:
                    # congress had a tag that wasn't on the image.
                    return False
            return True

        if not test.call_until_true(func=_check_data_table_glance_images,
                                    duration=100, sleep_for=5):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:venkatamahesh,项目名称:congress,代码行数:27,代码来源:test_glancev2.py


示例17: test_neutronv2_security_group_rules_table

    def test_neutronv2_security_group_rules_table(self):
        sgrs_schema = self.admin_manager.congress_client.show_datasource_table_schema(
            self.datasource_id, "security_group_rules"
        )["columns"]

        @helper.retry_on_exception
        def _check_data():
            client = self.security_groups_client
            security_groups_neutron = client.list_security_groups()
            sgrs_map = {}  # security_group_rules
            for sg in security_groups_neutron["security_groups"]:
                for sgr in sg["security_group_rules"]:
                    sgrs_map[sgr["id"]] = sgr

            client = self.admin_manager.congress_client
            client.request_refresh(self.datasource_id)
            time.sleep(1)

            security_group_rules = client.list_datasource_rows(self.datasource_id, "security_group_rules")

            # Validate security_group_rules table
            for row in security_group_rules["results"]:
                sg_rule_row = sgrs_map[row["data"][1]]
                for index in range(len(sgrs_schema)):
                    if str(row["data"][index]) != str(sg_rule_row[sgrs_schema[index]["name"]]):
                        return False
            return True

        if not test.call_until_true(func=_check_data, duration=200, sleep_for=10):
            raise exceptions.TimeoutException("Data did not converge in time " "or failure in server")
开发者ID:muroi,项目名称:congress,代码行数:30,代码来源:test_neutronv2.py


示例18: test_keystone_roles_table

    def test_keystone_roles_table(self):
        role_schema = (
            self.admin_manager.congress_client.show_datasource_table_schema(
                self.datasource_id, 'roles')['columns'])
        role_id_col = next(i for i, c in enumerate(role_schema)
                           if c['name'] == 'id')

        def _check_data_table_keystone_roles():
            # Fetch data from keystone each time, because this test may start
            # before keystone has all the users.
            roles = self.roles_client.list_roles()['roles']
            roles_map = {}
            for role in roles:
                roles_map[role['id']] = role

            results = (
                self.admin_manager.congress_client.list_datasource_rows(
                    self.datasource_id, 'roles'))
            for row in results['results']:
                try:
                    role_row = roles_map[row['data'][role_id_col]]
                except KeyError:
                    return False
                for index in range(len(role_schema)):
                    if (str(row['data'][index]) !=
                            str(role_row[role_schema[index]['name']])):
                        return False
            return True

        if not test.call_until_true(func=_check_data_table_keystone_roles,
                                    duration=100, sleep_for=4):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:venkatamahesh,项目名称:congress,代码行数:33,代码来源:test_keystonev2.py


示例19: test_ceilometer_meters_table

    def test_ceilometer_meters_table(self):
        meter_schema = (
            self.admin_manager.congress_client.show_datasource_table_schema(
                self.datasource_id, 'meters')['columns'])
        meter_id_col = next(i for i, c in enumerate(meter_schema)
                            if c['name'] == 'meter_id')

        def _check_data_table_ceilometer_meters():
            # Fetch data from ceilometer each time, because this test may start
            # before ceilometer has all the users.
            meters = self.telemetry_client.list_meters()
            meter_map = {}
            for meter in meters:
                meter_map[meter['meter_id']] = meter

            results = (
                self.admin_manager.congress_client.list_datasource_rows(
                    self.datasource_id, 'meters'))
            for row in results['results']:
                try:
                    meter_row = meter_map[row['data'][meter_id_col]]
                except KeyError:
                    return False
                for index in range(len(meter_schema)):
                    if (str(row['data'][index]) !=
                            str(meter_row[meter_schema[index]['name']])):
                        return False
            return True

        if not test.call_until_true(func=_check_data_table_ceilometer_meters,
                                    duration=100, sleep_for=5):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:MustWin,项目名称:congress,代码行数:33,代码来源:test_ceilometer.py


示例20: test_nova_datasource_driver_flavors

    def test_nova_datasource_driver_flavors(self):
        def _check_data_table_nova_flavors():
            # Fetch data from nova each time, because this test may start
            # before nova has all the users.
            flavors = self.flavors_client.list_flavors(detail=True)
            flavor_id_map = {}
            for flavor in flavors:
                flavor_id_map[flavor['id']] = flavor

            results = (
                self.admin_manager.congress_client.list_datasource_rows(
                    self.datasource_id, 'flavors'))
            # TODO(alexsyip): Not sure what the following OS-FLV-EXT-DATA:
            # prefix is for.
            keys = ['id', 'name', 'vcpus', 'ram', 'disk',
                    'OS-FLV-EXT-DATA:ephemeral', 'rxtx_factor']
            for row in results['results']:
                match = True
                try:
                    flavor_row = flavor_id_map[row['data'][0]]
                except KeyError:
                    return False
                for index in range(len(keys)):
                    if row['data'][index] != flavor_row[keys[index]]:
                        match = False
                        break
                if match:
                    return True
            return False

        if not test.call_until_true(func=_check_data_table_nova_flavors,
                                    duration=100, sleep_for=5):
            raise exceptions.TimeoutException("Data did not converge in time "
                                              "or failure in server")
开发者ID:AbelardLiu,项目名称:congress,代码行数:34,代码来源:test_nova.py



注:本文中的tempest.test.call_until_true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test.is_extension_enabled函数代码示例发布时间:2022-05-27
下一篇:
Python common.Element类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap