• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python waiters.wait_for_volume_status函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempest.common.waiters.wait_for_volume_status函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for_volume_status函数的具体用法?Python wait_for_volume_status怎么用?Python wait_for_volume_status使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_for_volume_status函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: attach_volume

    def attach_volume(self, server, volume, device=None):
        """Attaches volume to server and waits for 'in-use' volume status.

        The volume will be detached when the test tears down.

        :param server: The server to which the volume will be attached.
        :param volume: The volume to attach.
        :param device: Optional mountpoint for the attached volume. Note that
            this is not guaranteed for all hypervisors and is not recommended.
        """
        attach_kwargs = dict(volumeId=volume['id'])
        if device:
            attach_kwargs['device'] = device
        self.servers_client.attach_volume(
            server['id'], **attach_kwargs)
        # On teardown detach the volume and wait for it to be available. This
        # is so we don't error out when trying to delete the volume during
        # teardown.
        self.addCleanup(waiters.wait_for_volume_status,
                        self.volumes_client, volume['id'], 'available')
        # Ignore 404s on detach in case the server is deleted or the volume
        # is already detached.
        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
                        self.servers_client.detach_volume,
                        server['id'], volume['id'])
        waiters.wait_for_volume_status(self.volumes_client,
                                       volume['id'], 'in-use')
开发者ID:Tesora,项目名称:tesora-tempest,代码行数:27,代码来源:base.py


示例2: test_create_get_list_accept_volume_transfer

    def test_create_get_list_accept_volume_transfer(self):
        # Create a volume first
        volume = self.create_volume()
        self.addCleanup(self.delete_volume, self.adm_client, volume['id'])

        # Create a volume transfer
        transfer = self.client.create_volume_transfer(
            volume_id=volume['id'])['transfer']
        transfer_id = transfer['id']
        auth_key = transfer['auth_key']
        waiters.wait_for_volume_status(self.client,
                                       volume['id'], 'awaiting-transfer')

        # Get a volume transfer
        body = self.client.show_volume_transfer(transfer_id)['transfer']
        self.assertEqual(volume['id'], body['volume_id'])

        # List volume transfers, the result should be greater than
        # or equal to 1
        body = self.client.list_volume_transfers()['transfers']
        self.assertThat(len(body), matchers.GreaterThan(0))

        # Accept a volume transfer by alt_tenant
        body = self.alt_client.accept_volume_transfer(
            transfer_id, auth_key=auth_key)['transfer']
        waiters.wait_for_volume_status(self.alt_client,
                                       volume['id'], 'available')
开发者ID:WoolenWang,项目名称:tempest,代码行数:27,代码来源:test_volume_transfers.py


示例3: _create_user_message

 def _create_user_message(self):
     """Trigger a 'no valid host' situation to generate a message."""
     bad_protocol = data_utils.rand_name('storage_protocol')
     bad_vendor = data_utils.rand_name('vendor_name')
     extra_specs = {'storage_protocol': bad_protocol,
                    'vendor_name': bad_vendor}
     vol_type_name = data_utils.rand_name(
         self.__class__.__name__ + '-volume-type')
     bogus_type = self.admin_volume_types_client.create_volume_type(
         name=vol_type_name,
         extra_specs=extra_specs)['volume_type']
     self.addCleanup(self.admin_volume_types_client.delete_volume_type,
                     bogus_type['id'])
     params = {'volume_type': bogus_type['id'],
               'size': CONF.volume.volume_size}
     volume = self.volumes_client.create_volume(**params)['volume']
     self.addCleanup(self.delete_volume, self.volumes_client, volume['id'])
     try:
         waiters.wait_for_volume_status(self.volumes_client, volume['id'],
                                        'error')
     except exceptions.VolumeBuildErrorException:
         # Error state is expected and desired
         pass
     messages = self.messages_client.list_messages()['messages']
     message_id = None
     for message in messages:
         if message['resource_uuid'] == volume['id']:
             message_id = message['id']
             break
     self.assertIsNotNone(message_id, 'No user message generated for '
                                      'volume %s' % volume['id'])
     return message_id
开发者ID:Tesora,项目名称:tesora-tempest,代码行数:32,代码来源:test_user_messages.py


示例4: test_volume_swap

 def test_volume_swap(self):
     # Create two volumes.
     # NOTE(gmann): Volumes are created before server creation so that
     # volumes cleanup can happen successfully irrespective of which volume
     # is attached to server.
     volume1 = self.create_volume()
     volume2 = self.create_volume()
     # Boot server
     server = self.create_test_server(wait_until='ACTIVE')
     # Attach "volume1" to server
     self.attach_volume(server, volume1)
     # Swap volume from "volume1" to "volume2"
     self.servers_admin_client.update_attached_volume(
         server['id'], volume1['id'], volumeId=volume2['id'])
     waiters.wait_for_volume_status(self.volumes_client,
                                    volume1['id'], 'available')
     waiters.wait_for_volume_status(self.volumes_client,
                                    volume2['id'], 'in-use')
     self.addCleanup(self.servers_client.detach_volume,
                     server['id'], volume2['id'])
     # Verify "volume2" is attached to the server
     vol_attachments = self.servers_client.list_volume_attachments(
         server['id'])['volumeAttachments']
     self.assertEqual(1, len(vol_attachments))
     self.assertIn(volume2['id'], vol_attachments[0]['volumeId'])
开发者ID:Tesora,项目名称:tesora-tempest,代码行数:25,代码来源:test_volume_swap.py


示例5: _volume_clean_up

 def _volume_clean_up(self, server_id, volume_id):
     body = self.volumes_client.show_volume(volume_id)['volume']
     if body['status'] == 'in-use':
         self.servers_client.detach_volume(server_id, volume_id)
         waiters.wait_for_volume_status(self.volumes_client,
                                        volume_id, 'available')
     self.volumes_client.delete_volume(volume_id)
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:7,代码来源:test_live_migration.py


示例6: test_volume_create_get_delete

 def test_volume_create_get_delete(self):
     # CREATE, GET, DELETE Volume
     volume = None
     v_name = data_utils.rand_name("Volume")
     metadata = {"Type": "work"}
     # Create volume
     volume = self.client.create_volume(size=CONF.volume.volume_size, display_name=v_name, metadata=metadata)
     self.addCleanup(self.delete_volume, volume["id"])
     self.assertIn("id", volume)
     self.assertIn("displayName", volume)
     self.assertEqual(volume["displayName"], v_name, "The created volume name is not equal " "to the requested name")
     self.assertTrue(volume["id"] is not None, "Field volume id is empty or not found.")
     # Wait for Volume status to become ACTIVE
     waiters.wait_for_volume_status(self.client, volume["id"], "available")
     # GET Volume
     fetched_volume = self.client.show_volume(volume["id"])
     # Verification of details of fetched Volume
     self.assertEqual(
         v_name, fetched_volume["displayName"], "The fetched Volume is different " "from the created Volume"
     )
     self.assertEqual(
         volume["id"], fetched_volume["id"], "The fetched Volume is different " "from the created Volume"
     )
     self.assertThat(
         fetched_volume["metadata"].items(),
         matchers.ContainsAll(metadata.items()),
         "The fetched Volume metadata misses data " "from the created Volume",
     )
开发者ID:kiryl-zaytes,项目名称:tempest,代码行数:28,代码来源:test_volumes_get.py


示例7: test_create_failover_reverse_enable_disable_delete_replication

 def test_create_failover_reverse_enable_disable_delete_replication(self):
     name = data_utils.rand_name('tempest-replication')
     master_vol = self.local_volid_list[0]
     slave_vol = self.replication_volid_list[0]
     kwargs = {
         'name': name,
         'description': 'this replication is for jenkins test.',
     }
     replication = self.client.create_replication(master_vol,
                                                  slave_vol,
                                                  **kwargs)['replication']
     LOG.info("@@@create replication %s" % (replication['id']))
     sgs_waiters.wait_for_sgs_replication_status(self.client, replication['id'], 'enabled')
     self.client.failover_replication(replication['id'])
     LOG.info("@@@failover replication %s" % (replication['id']))
     sgs_waiters.wait_for_sgs_replication_status(self.client, replication['id'], 'failed-over')
     self.client.reverse_replication(replication['id'])
     LOG.info("@@@reverse replication %s" % (replication['id']))
     sgs_waiters.wait_for_sgs_replication_status(self.client, replication['id'], 'disabled')
     LOG.info("@@@attach replication volume:%s" % (slave_vol))
     self.sgs_volume_client.attach_volume(self.replication_test_vm_id, slave_vol)
     LOG.info("@@@detach local volume:%s" % (master_vol))
     self.sgs_volume_client.detach_volume(self.local_test_vm_id, master_vol)
     waiters.wait_for_volume_status(self.sgs_volume_client, slave_vol, 'in-use')
     waiters.wait_for_volume_status(self.sgs_volume_client, master_vol,'enabled')
     self.client.enable_replication(replication['id'])
     LOG.info("@@@enable replication %s" % (replication['id']))
     sgs_waiters.wait_for_sgs_replication_status(self.client, replication['id'], 'enabled')
     self.client.disable_replication(replication['id'])
     LOG.info("@@@disable replication %s" % (replication['id']))
     sgs_waiters.wait_for_sgs_replication_status(self.client, replication['id'], 'disabled')
     self.client.delete_replication(replication['id'])
     LOG.info("@@@delete replication %s" % (replication['id']))
     self.client.wait_for_resource_deletion(replication['id'])
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:34,代码来源:test_replication_actions.py


示例8: _create_type_and_volume

    def _create_type_and_volume(self, backend_name_key, with_prefix):
        # Volume/Type creation
        type_name = data_utils.rand_name('Type')
        vol_name = data_utils.rand_name('Volume')
        spec_key_with_prefix = "capabilities:volume_backend_name"
        spec_key_without_prefix = "volume_backend_name"
        if with_prefix:
            extra_specs = {spec_key_with_prefix: backend_name_key}
        else:
            extra_specs = {spec_key_without_prefix: backend_name_key}
        self.type = self.volume_types_client.create_volume_type(
            name=type_name, extra_specs=extra_specs)['volume_type']
        self.volume_type_id_list.append(self.type['id'])

        params = {self.name_field: vol_name, 'volume_type': type_name}

        self.volume = self.admin_volume_client.create_volume(
            **params)['volume']
        if with_prefix:
            self.volume_id_list_with_prefix.append(self.volume['id'])
        else:
            self.volume_id_list_without_prefix.append(
                self.volume['id'])
        waiters.wait_for_volume_status(self.admin_volume_client,
                                       self.volume['id'], 'available')
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:25,代码来源:test_multi_backend.py


示例9: test_rebuild_server_with_volume_attached

    def test_rebuild_server_with_volume_attached(self):
        # create a new volume and attach it to the server
        volume = self.volumes_client.create_volume(
            size=CONF.volume.volume_size)
        volume = volume['volume']
        self.addCleanup(self.volumes_client.delete_volume, volume['id'])
        waiters.wait_for_volume_status(self.volumes_client, volume['id'],
                                       'available')

        self.client.attach_volume(self.server_id, volumeId=volume['id'])
        self.addCleanup(waiters.wait_for_volume_status, self.volumes_client,
                        volume['id'], 'available')
        self.addCleanup(self.client.detach_volume,
                        self.server_id, volume['id'])
        waiters.wait_for_volume_status(self.volumes_client, volume['id'],
                                       'in-use')

        # run general rebuild test
        self.test_rebuild_server()

        # make sure the volume is attached to the instance after rebuild
        vol_after_rebuild = self.volumes_client.show_volume(volume['id'])
        vol_after_rebuild = vol_after_rebuild['volume']
        self.assertEqual('in-use', vol_after_rebuild['status'])
        self.assertEqual(self.server_id,
                         vol_after_rebuild['attachments'][0]['server_id'])
开发者ID:devfaz,项目名称:tempest,代码行数:26,代码来源:test_server_actions.py


示例10: test_volume_create_get_delete

 def test_volume_create_get_delete(self):
     # CREATE, GET, DELETE Volume
     volume = None
     v_name = data_utils.rand_name('Volume')
     metadata = {'Type': 'work'}
     # Create volume
     volume = self.client.create_volume(size=CONF.volume.volume_size,
                                        display_name=v_name,
                                        metadata=metadata)['volume']
     self.addCleanup(self.delete_volume, volume['id'])
     self.assertIn('id', volume)
     self.assertIn('displayName', volume)
     self.assertEqual(volume['displayName'], v_name,
                      "The created volume name is not equal "
                      "to the requested name")
     self.assertTrue(volume['id'] is not None,
                     "Field volume id is empty or not found.")
     # Wait for Volume status to become ACTIVE
     waiters.wait_for_volume_status(self.client, volume['id'], 'available')
     # GET Volume
     fetched_volume = self.client.show_volume(volume['id'])['volume']
     # Verification of details of fetched Volume
     self.assertEqual(v_name,
                      fetched_volume['displayName'],
                      'The fetched Volume is different '
                      'from the created Volume')
     self.assertEqual(volume['id'],
                      fetched_volume['id'],
                      'The fetched Volume is different '
                      'from the created Volume')
     self.assertThat(fetched_volume['metadata'].items(),
                     matchers.ContainsAll(metadata.items()),
                     'The fetched Volume metadata misses data '
                     'from the created Volume')
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:34,代码来源:test_volumes_get.py


示例11: test_volume_backup_create_get_detailed_list_restore_delete

    def test_volume_backup_create_get_detailed_list_restore_delete(self):
        # Create backup
        backup_name = data_utils.rand_name('Backup')
        create_backup = self.backups_adm_client.create_backup
        backup = create_backup(volume_id=self.volume['id'],
                               name=backup_name)['backup']
        self.addCleanup(self.backups_adm_client.delete_backup,
                        backup['id'])
        self.assertEqual(backup_name, backup['name'])
        waiters.wait_for_volume_status(self.admin_volume_client,
                                       self.volume['id'], 'available')
        self.backups_adm_client.wait_for_backup_status(backup['id'],
                                                       'available')

        # Get a given backup
        backup = self.backups_adm_client.show_backup(backup['id'])['backup']
        self.assertEqual(backup_name, backup['name'])

        # Get all backups with detail
        backups = self.backups_adm_client.list_backups(detail=True)['backups']
        self.assertIn((backup['name'], backup['id']),
                      [(m['name'], m['id']) for m in backups])

        # Restore backup
        restore = self.backups_adm_client.restore_backup(
            backup['id'])['restore']

        # Delete backup
        self.addCleanup(self.admin_volume_client.delete_volume,
                        restore['volume_id'])
        self.assertEqual(backup['id'], restore['backup_id'])
        self.backups_adm_client.wait_for_backup_status(backup['id'],
                                                       'available')
        waiters.wait_for_volume_status(self.admin_volume_client,
                                       restore['volume_id'], 'available')
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:35,代码来源:test_volumes_backup.py


示例12: test_volume_snapshot_create_get_list_delete

    def test_volume_snapshot_create_get_list_delete(self):
        v_name = data_utils.rand_name('Volume')
        volume = self.volumes_client.create_volume(
            size=CONF.volume.volume_size,
            display_name=v_name)['volume']
        self.addCleanup(self.delete_volume, volume['id'])
        waiters.wait_for_volume_status(self.volumes_client, volume['id'],
                                       'available')
        s_name = data_utils.rand_name('Snapshot')
        # Create snapshot
        snapshot = self.snapshots_client.create_snapshot(
            volume_id=volume['id'],
            display_name=s_name)['snapshot']

        def delete_snapshot(snapshot_id):
            waiters.wait_for_snapshot_status(self.snapshots_client,
                                             snapshot_id,
                                             'available')
            # Delete snapshot
            self.snapshots_client.delete_snapshot(snapshot_id)
            self.snapshots_client.wait_for_resource_deletion(snapshot_id)

        self.addCleanup(delete_snapshot, snapshot['id'])
        self.assertEqual(volume['id'], snapshot['volumeId'])
        # Get snapshot
        fetched_snapshot = self.snapshots_client.show_snapshot(
            snapshot['id'])['snapshot']
        self.assertEqual(s_name, fetched_snapshot['displayName'])
        self.assertEqual(volume['id'], fetched_snapshot['volumeId'])
        # Fetch all snapshots
        snapshots = self.snapshots_client.list_snapshots()['snapshots']
        self.assertIn(snapshot['id'], map(lambda x: x['id'], snapshots))
开发者ID:Hybrid-Cloud,项目名称:hybrid-tempest,代码行数:32,代码来源:test_volume_snapshots.py


示例13: _create_and_attach

    def _create_and_attach(self):
        # Start a server and wait for it to become ready
        self.admin_pass = self.image_ssh_password
        self.server = self.create_test_server(
            validatable=True,
            wait_until='ACTIVE',
            adminPass=self.admin_pass)

        # Record addresses so that we can ssh later
        self.server['addresses'] = self.servers_client.list_addresses(
            self.server['id'])['addresses']

        # Create a volume and wait for it to become ready
        self.volume = self.volumes_client.create_volume(
            size=CONF.volume.volume_size, display_name='test')['volume']
        self.addCleanup(self._delete_volume)
        waiters.wait_for_volume_status(self.volumes_client,
                                       self.volume['id'], 'available')

        # Attach the volume to the server
        self.attachment = self.servers_client.attach_volume(
            self.server['id'],
            volumeId=self.volume['id'],
            device='/dev/%s' % self.device)['volumeAttachment']
        waiters.wait_for_volume_status(self.volumes_client,
                                       self.volume['id'], 'in-use')

        self.addCleanup(self._detach, self.server['id'], self.volume['id'])
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:28,代码来源:test_attach_volume.py


示例14: resource_setup

 def resource_setup(cls):
     super(VolumesTestJSON, cls).resource_setup()
     # Create 3 Volumes
     cls.volume_list = []
     cls.volume_id_list = []
     for i in range(3):
         v_name = data_utils.rand_name('volume')
         metadata = {'Type': 'work'}
         try:
             volume = cls.client.create_volume(size=CONF.volume.volume_size,
                                               display_name=v_name,
                                               metadata=metadata)['volume']
             waiters.wait_for_volume_status(cls.client,
                                            volume['id'], 'available')
             volume = cls.client.show_volume(volume['id'])['volume']
             cls.volume_list.append(volume)
             cls.volume_id_list.append(volume['id'])
         except Exception:
             if cls.volume_list:
                 # We could not create all the volumes, though we were able
                 # to create *some* of the volumes. This is typically
                 # because the backing file size of the volume group is
                 # too small. So, here, we clean up whatever we did manage
                 # to create and raise a SkipTest
                 for volume in cls.volume_list:
                     cls.delete_volume(volume['id'])
                 msg = ("Failed to create ALL necessary volumes to run "
                        "test. This typically means that the backing file "
                        "size of the nova-volumes group is too small to "
                        "create the 3 volumes needed by this test case")
                 raise cls.skipException(msg)
             raise
开发者ID:NeCTAR-RC,项目名称:tempest,代码行数:32,代码来源:test_volumes_list.py


示例15: _create_volume_from_image

    def _create_volume_from_image(cls):
        clients = cls.os
        image_id = CONF.compute.image_ref
        volume_name = data_utils.rand_name('volume')
        volumes_client = clients.volumes_v2_client
        if CONF.volume_feature_enabled.api_v1:
            volumes_client = clients.volumes_client
        volume = volumes_client.create_volume(
            display_name=volume_name,
            imageRef=image_id,
            volume_type=CONF.volume.aws_volume_type,
            availability_zone=CONF.volume.aws_availability_zone)
        waiters.wait_for_volume_status(volumes_client,
                                       volume['volume']['id'], 'available')
        LOG.warning('volume: %s' % volume)
        kwargs = {}
        boot_volume = {
            'uuid': volume['volume']['id'],
            'source_type': 'volume',
            'destination_type': 'volume',
            'boot_index': 0,
            'delete_on_termination': True,
            'volume_size': 1}
        bd_map_v2 = []
        bd_map_v2.append(boot_volume)
        kwargs['block_device_mapping_v2'] = bd_map_v2

        LOG.warning('bdm_v2: %s' % kwargs)
        return kwargs
开发者ID:Hybrid-Cloud,项目名称:hybrid-tempest,代码行数:29,代码来源:test_servers_operations_ext.py


示例16: test_volume_type_access_add

    def test_volume_type_access_add(self):
        # Creating a NON public volume type
        params = {'os-volume-type-access:is_public': False}
        volume_type = self.create_volume_type(**params)

        # Try creating a volume from volume type in primary tenant
        self.assertRaises(lib_exc.NotFound, self.volumes_client.create_volume,
                          volume_type=volume_type['id'],
                          size=CONF.volume.volume_size)

        # Adding volume type access for primary tenant
        self.admin_volume_types_client.add_type_access(
            volume_type['id'], project=self.volumes_client.tenant_id)
        self.addCleanup(self.admin_volume_types_client.remove_type_access,
                        volume_type['id'],
                        project=self.volumes_client.tenant_id)

        # Creating a volume from primary tenant
        volume = self.volumes_client.create_volume(
            volume_type=volume_type['id'],
            size=CONF.volume.volume_size)['volume']
        self.addCleanup(self.delete_volume, self.volumes_client, volume['id'])
        waiters.wait_for_volume_status(self.volumes_client, volume['id'],
                                       'available')

        # Validating the created volume is based on the volume type
        self.assertEqual(volume_type['name'], volume['volume_type'])
开发者ID:Tesora,项目名称:tesora-tempest,代码行数:27,代码来源:test_volume_type_access.py


示例17: _create_volume

 def _create_volume(self):
     volume = self.volumes_extensions_client.create_volume(
         CONF.volume.volume_size, display_name=data_utils.rand_name(
             self.__class__.__name__ + '_volume'))
     self.addCleanup(self.delete_volume, volume['id'])
     waiters.wait_for_volume_status(self.volumes_extensions_client,
                                    volume['id'], 'available')
     return volume
开发者ID:SivagnanamCiena,项目名称:tempest,代码行数:8,代码来源:test_server_rescue_negative.py


示例18: _create_temp_volume

    def _create_temp_volume(self):
        # Create a temp volume for force delete tests
        vol_name = utils.rand_name('Volume')
        params = {self.name_field: vol_name}
        temp_volume = self.client.create_volume(**params)['volume']
        waiters.wait_for_volume_status(self.client, temp_volume['id'], 'available')

        return temp_volume
开发者ID:Hybrid-Cloud,项目名称:hybrid-tempest,代码行数:8,代码来源:test_volumes_actions.py


示例19: test_volume_extend

 def test_volume_extend(self):
     # Extend Volume Test.
     self.volume = self.create_volume()
     extend_size = int(self.volume['size']) + 1
     self.client.extend_volume(self.volume['id'], new_size=extend_size)
     waiters.wait_for_volume_status(self.client, self.volume['id'], 'available')
     volume = self.client.show_volume(self.volume['id'])['volume']
     self.assertEqual(int(volume['size']), extend_size)
开发者ID:Hybrid-Cloud,项目名称:hybrid-tempest,代码行数:8,代码来源:test_volumes_extend.py


示例20: test_consistencygroup_cgsnapshot_create_delete

    def test_consistencygroup_cgsnapshot_create_delete(self):
        # Create volume type
        name = data_utils.rand_name("volume-type")
        volume_type = self.admin_volume_types_client.create_volume_type(
            name=name)['volume_type']

        # Create CG
        cg_name = data_utils.rand_name('CG')
        create_consistencygroup = (
            self.consistencygroups_adm_client.create_consistencygroup)
        cg = create_consistencygroup(volume_type['id'],
                                     name=cg_name)['consistencygroup']
        vol_name = data_utils.rand_name("volume")
        self.name_field = self.special_fields['name_field']
        params = {self.name_field: vol_name,
                  'volume_type': volume_type['id'],
                  'consistencygroup_id': cg['id'],
                  'size': CONF.volume.volume_size}

        # Create volume
        volume = self.admin_volume_client.create_volume(**params)['volume']
        waiters.wait_for_volume_status(self.admin_volume_client,
                                       volume['id'], 'available')
        self.consistencygroups_adm_client.wait_for_consistencygroup_status(
            cg['id'], 'available')
        self.assertEqual(cg_name, cg['name'])

        # Create cgsnapshot
        cgsnapshot_name = data_utils.rand_name('cgsnapshot')
        create_cgsnapshot = (
            self.consistencygroups_adm_client.create_cgsnapshot)
        cgsnapshot = create_cgsnapshot(cg['id'],
                                       name=cgsnapshot_name)['cgsnapshot']
        snapshots = self.admin_snapshots_client.list_snapshots(
            detail=True)['snapshots']
        for snap in snapshots:
            if volume['id'] == snap['volume_id']:
                waiters.wait_for_snapshot_status(self.admin_snapshots_client,
                                                 snap['id'], 'available')
        self.consistencygroups_adm_client.wait_for_cgsnapshot_status(
            cgsnapshot['id'], 'available')
        self.assertEqual(cgsnapshot_name, cgsnapshot['name'])

        # Get a given CG snapshot
        cgsnapshot = self.consistencygroups_adm_client.show_cgsnapshot(
            cgsnapshot['id'])['cgsnapshot']
        self.assertEqual(cgsnapshot_name, cgsnapshot['name'])

        # Get all CG snapshots with detail
        cgsnapshots = self.consistencygroups_adm_client.list_cgsnapshots(
            detail=True)['cgsnapshots']
        self.assertIn((cgsnapshot['name'], cgsnapshot['id']),
                      [(m['name'], m['id']) for m in cgsnapshots])

        # Clean up
        self._delete_cgsnapshot(cgsnapshot['id'], cg['id'])
        self._delete_consistencygroup(cg['id'])
        self.admin_volume_types_client.delete_volume_type(volume_type['id'])
开发者ID:NetApp,项目名称:cinder,代码行数:58,代码来源:test_consistencygroups.py



注:本文中的tempest.common.waiters.wait_for_volume_status函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python auth.get_credentials函数代码示例发布时间:2022-05-27
下一篇:
Python waiters.wait_for_volume_resource_status函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap