• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python six.text_type函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.text_type函数的典型用法代码示例。如果您正苦于以下问题:Python text_type函数的具体用法?Python text_type怎么用?Python text_type使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了text_type函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_thread_group_handles_child_exception

 def test_thread_group_handles_child_exception(self):
     try:
         with context.ThreadGroup() as tg:
             tg.spawn('raiser1', self._raise_test_exc, 'exc1')
     except ex.ThreadException as te:
         self.assertIn('exc1', six.text_type(te))
         self.assertIn('raiser1', six.text_type(te))
开发者ID:stannie42,项目名称:sahara,代码行数:7,代码来源:test_context.py


示例2: pool_entries

def pool_entries(controller, pool_ctrl, count):
    """Context manager to create several catalogue entries for testing.

    The entries are automatically deleted when the context manager
    goes out of scope.

    :param controller: storage handler
    :type controller: queues.storage.base:CatalogueBase
    :param count: number of entries to create
    :type count: int
    :returns: [(project, queue, pool)]
    :rtype: [(six.text_type, six.text_type, six.text_type)]
    """
    spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i))
            for i in range(count)]

    for p, q, s in spec:
        pool_ctrl.create(s, 100, s)
        controller.insert(p, q, s)

    yield spec

    for p, q, s in spec:
        controller.delete(p, q)
        pool_ctrl.delete(s)
开发者ID:openstack,项目名称:zaqar,代码行数:25,代码来源:helpers.py


示例3: __init__

    def __init__(self, error=None, path=None, message=None,
                 resource=None):
        if path is None:
            path = []
        elif isinstance(path, six.string_types):
            path = [path]

        if resource is not None and not path:
            path = [resource.stack.t.get_section_name(
                resource.stack.t.RESOURCES), resource.name]
        if isinstance(error, Exception):
            if isinstance(error, StackValidationFailed):
                str_error = error.error
                message = error.error_message
                path = path + error.path
                # This is a hack to avoid the py3 (chained exception)
                # json serialization circular reference error from
                # oslo.messaging.
                self.args = error.args
            else:
                str_error = six.text_type(type(error).__name__)
                message = six.text_type(error)
        else:
            str_error = error

        super(StackValidationFailed, self).__init__(error=str_error, path=path,
                                                    message=message)
开发者ID:aaratn,项目名称:heat,代码行数:27,代码来源:exception.py


示例4: test_node_handle_exception

    def test_node_handle_exception(self, mock_warning):
        ex = exception.ResourceStatusError(resource_id='FAKE_ID',
                                           status='FAKE_STATUS',
                                           reason='FAKE_REASON')
        node = nodem.Node('node1', self.profile.id, None, self.context)
        node.store(self.context)
        node._handle_exception(self.context, 'ACTION', 'STATUS', ex)
        db_node = db_api.node_get(self.context, node.id)
        self.assertEqual(node.ERROR, db_node.status)
        self.assertEqual('Profile failed in ACTIOing resource '
                         '(FAKE_ID) due to: %s' % six.text_type(ex),
                         db_node.status_reason)
        self.assertEqual('FAKE_ID', db_node.physical_id)
        mock_warning.assert_called_with(self.context, node, 'ACTION',
                                        'STATUS', six.text_type(ex))

        # Exception happens before physical node creation started.
        ex = exception.ResourceCreationFailure(rtype='stack',
                                               code=400,
                                               message='Bad request')
        node = nodem.Node('node1', self.profile.id, None, self.context)
        node.store(self.context)
        node._handle_exception(self.context, 'CREATE', 'STATUS', ex)
        db_node = db_api.node_get(self.context, node.id)
        self.assertEqual(node.ERROR, db_node.status)
        self.assertEqual('Profile failed in creating node due to: '
                         '%s' % six.text_type(ex), db_node.status_reason)
        self.assertEqual(None, db_node.physical_id)
        mock_warning.assert_called_with(self.context, node, 'CREATE',
                                        'STATUS', six.text_type(ex))
开发者ID:Tennyson53,项目名称:senlin,代码行数:30,代码来源:test_node.py


示例5: create

    def create(self, req, body):
        """Creates a new share group snapshot."""
        context = req.environ['manila.context']

        if not self.is_valid_body(body, 'share_group_snapshot'):
            msg = _("'share_group_snapshot' is missing from the request body.")
            raise exc.HTTPBadRequest(explanation=msg)

        share_group_snapshot = body.get('share_group_snapshot', {})

        share_group_id = share_group_snapshot.get('share_group_id')
        if not share_group_id:
            msg = _("Must supply 'share_group_id' attribute.")
            raise exc.HTTPBadRequest(explanation=msg)
        if not uuidutils.is_uuid_like(share_group_id):
            msg = _("The 'share_group_id' attribute must be a uuid.")
            raise exc.HTTPBadRequest(explanation=six.text_type(msg))

        kwargs = {"share_group_id": share_group_id}
        if 'name' in share_group_snapshot:
            kwargs['name'] = share_group_snapshot.get('name')
        if 'description' in share_group_snapshot:
            kwargs['description'] = share_group_snapshot.get('description')

        try:
            new_snapshot = self.share_group_api.create_share_group_snapshot(
                context, **kwargs)
        except exception.ShareGroupNotFound as e:
            raise exc.HTTPBadRequest(explanation=six.text_type(e))
        except exception.InvalidShareGroup as e:
            raise exc.HTTPConflict(explanation=six.text_type(e))

        return self._view_builder.detail(req, dict(new_snapshot.items()))
开发者ID:bswartz,项目名称:manila,代码行数:33,代码来源:share_group_snapshots.py


示例6: test_matrix_environments

def test_matrix_environments(tmpdir):
    conf = config.Config()

    conf.env_dir = six.text_type(tmpdir.join("env"))

    conf.pythons = ["2.7", "3.4"]
    conf.matrix = {
        "six": ["1.4", None],
        "colorama": ["0.3.1", "0.3.3"]
    }
    environments = list(environment.get_environments(conf))

    assert len(environments) == 2 * 2 * 2

    # Only test the first two environments, since this is so time
    # consuming
    for env in environments[:2]:
        env.create()

        output = env.run(
            ['-c', 'import six, sys; sys.stdout.write(six.__version__)'],
            valid_return_codes=None)
        if 'six' in env._requirements:
            assert output.startswith(six.text_type(env._requirements['six']))

        output = env.run(
            ['-c', 'import colorama, sys; sys.stdout.write(colorama.__version__)'])
        assert output.startswith(six.text_type(env._requirements['colorama']))
开发者ID:hcontrast,项目名称:asv,代码行数:28,代码来源:test_environment.py


示例7: __exit__

    def __exit__(self, ex_type, ex_value, ex_traceback):
        if not ex_value:
            return True

        if isinstance(ex_value, exception.NotAuthorized):
            msg = six.text_type(ex_value)
            raise Fault(webob.exc.HTTPForbidden(explanation=msg))
        elif isinstance(ex_value, exception.Invalid):
            raise Fault(exception.ConvertedException(
                code=ex_value.code, explanation=six.text_type(ex_value)))
        elif isinstance(ex_value, TypeError):
            exc_info = (ex_type, ex_value, ex_traceback)
            LOG.error(_(
                'Exception handling resource: %s') %
                ex_value, exc_info=exc_info)
            raise Fault(webob.exc.HTTPBadRequest())
        elif isinstance(ex_value, Fault):
            LOG.info(_("Fault thrown: %s"), six.text_type(ex_value))
            raise ex_value
        elif isinstance(ex_value, webob.exc.HTTPException):
            LOG.info(_("HTTP exception thrown: %s"), six.text_type(ex_value))
            raise Fault(ex_value)

        # We didn't handle the exception
        return False
开发者ID:grafuls,项目名称:manila,代码行数:25,代码来源:wsgi.py


示例8: test_simple

    def test_simple(self):
        user = self.create_user()

        org = self.create_organization(owner=user)

        auth_provider = AuthProvider.objects.create(
            organization=org,
            provider='dummy',
        )
        auth_identity = AuthIdentity.objects.create(
            auth_provider=auth_provider,
            ident=user.email,
            user=user,
        )
        auth = Authenticator.objects.create(
            type=available_authenticators(ignore_backup=True)[0].type,
            user=user,
        )

        result = serialize(user, user, DetailedUserSerializer())
        assert result['id'] == six.text_type(user.id)
        assert result['has2fa'] is True
        assert len(result['emails']) == 1
        assert result['emails'][0]['email'] == user.email
        assert result['emails'][0]['is_verified']
        assert 'identities' in result
        assert len(result['identities']) == 1
        assert result['identities'][0]['id'] == six.text_type(auth_identity.id)
        assert result['identities'][0]['name'] == auth_identity.ident
        assert 'authenticators' in result
        assert len(result['authenticators']) == 1
        assert result['authenticators'][0]['id'] == six.text_type(auth.id)
开发者ID:hosmelq,项目名称:sentry,代码行数:32,代码来源:test_user.py


示例9: _get_context

 def _get_context(self, networks_per_tenant=2, neutron_network_provider=True):
     tenants = {}
     for t_id in range(self.TENANTS_AMOUNT):
         tenants[six.text_type(t_id)] = {"name": six.text_type(t_id)}
         tenants[six.text_type(t_id)]["networks"] = []
         for i in range(networks_per_tenant):
             network = {"id": "fake_net_id_%s" % i}
             if neutron_network_provider:
                 network["subnets"] = ["fake_subnet_id_of_net_%s" % i]
             else:
                 network["cidr"] = "101.0.5.0/24"
             tenants[six.text_type(t_id)]["networks"].append(network)
     users = []
     for t_id in tenants.keys():
         for i in range(self.USERS_PER_TENANT):
             users.append({"id": i, "tenant_id": t_id, "credential": "fake"})
     context = {
         "config": {
             "users": {
                 "tenants": self.TENANTS_AMOUNT,
                 "users_per_tenant": self.USERS_PER_TENANT,
                 "random_user_choice": False,
             },
             consts.SHARE_NETWORKS_CONTEXT_NAME: {"use_share_networks": True, "share_networks": []},
             "network": {"networks_per_tenant": networks_per_tenant, "start_cidr": "101.0.5.0/24"},
         },
         "admin": {"credential": mock.MagicMock()},
         "task": mock.MagicMock(),
         "users": users,
         "tenants": tenants,
     }
     return context
开发者ID:gluke77,项目名称:rally,代码行数:32,代码来源:test_manila_share_networks.py


示例10: output_param_to_log

    def output_param_to_log(self, storage_protocol):
        essential_inherited_param = ['volume_backend_name', 'volume_driver']
        conf = self.configuration

        msg = basic_lib.set_msg(1, config_group=conf.config_group)
        LOG.info(msg)
        version = self.command.get_comm_version()
        if conf.hitachi_unit_name:
            prefix = 'HSNM2 version'
        else:
            prefix = 'RAID Manager version'
        LOG.info('\t%-35s%s' % (prefix + ': ', six.text_type(version)))
        for param in essential_inherited_param:
            value = conf.safe_get(param)
            LOG.info('\t%-35s%s' % (param + ': ', six.text_type(value)))
        for opt in volume_opts:
            if not opt.secret:
                value = getattr(conf, opt.name)
                LOG.info('\t%-35s%s' % (opt.name + ': ',
                         six.text_type(value)))

        if storage_protocol == 'iSCSI':
            value = getattr(conf, 'hitachi_group_request')
            LOG.info('\t%-35s%s' % ('hitachi_group_request: ',
                     six.text_type(value)))
开发者ID:Qeas,项目名称:cinder,代码行数:25,代码来源:hbsd_common.py


示例11: copy_sync_data

 def copy_sync_data(self, src_ldev, dest_ldev, size):
     src_vol = {'provider_location': six.text_type(src_ldev),
                'id': 'src_vol'}
     dest_vol = {'provider_location': six.text_type(dest_ldev),
                 'id': 'dest_vol'}
     properties = utils.brick_get_connector_properties()
     driver = self.generated_from
     src_info = None
     dest_info = None
     try:
         dest_info = driver._attach_volume(self.context, dest_vol,
                                           properties)
         src_info = driver._attach_volume(self.context, src_vol,
                                          properties)
         volume_utils.copy_volume(src_info['device']['path'],
                                  dest_info['device']['path'], size * 1024,
                                  self.configuration.volume_dd_blocksize)
     finally:
         if dest_info:
             driver._detach_volume(self.context, dest_info,
                                   dest_vol, properties)
         if src_info:
             driver._detach_volume(self.context, src_info,
                                   src_vol, properties)
     self.command.discard_zero_page(dest_ldev)
开发者ID:Qeas,项目名称:cinder,代码行数:25,代码来源:hbsd_common.py


示例12: test_get_network_id_by_label

    def test_get_network_id_by_label(self):
        """Tests the get_net_id_by_label function."""
        net = mock.MagicMock()
        net.id = str(uuid.uuid4())
        self.nova_client.networks.find.side_effect = [
            net, nova_exceptions.NotFound(404),
            nova_exceptions.NoUniqueMatch()]
        self.assertEqual(net.id,
                         self.nova_plugin.get_net_id_by_label('net_label'))

        exc = self.assertRaises(
            exception.NovaNetworkNotFound,
            self.nova_plugin.get_net_id_by_label, 'idontexist')
        expected = 'The Nova network (idontexist) could not be found'
        self.assertIn(expected, six.text_type(exc))
        exc = self.assertRaises(
            exception.PhysicalResourceNameAmbiguity,
            self.nova_plugin.get_net_id_by_label, 'notUnique')
        expected = ('Multiple physical resources were found '
                    'with name (notUnique)')
        self.assertIn(expected, six.text_type(exc))
        calls = [mock.call(label='net_label'),
                 mock.call(label='idontexist'),
                 mock.call(label='notUnique')]
        self.nova_client.networks.find.assert_has_calls(calls)
开发者ID:sizowie,项目名称:heat,代码行数:25,代码来源:test_nova_client.py


示例13: unmanage

    def unmanage(self, req, id):
        """Unmanage a share."""
        context = req.environ['manila.context']
        authorize(context)

        LOG.info(_LI("Unmanage share with id: %s"), id, context=context)

        try:
            share = self.share_api.get(context, id)
            if share.get('share_server_id'):
                msg = _("Operation 'unmanage' is not supported for shares "
                        "that are created on top of share servers "
                        "(created with share-networks).")
                raise exc.HTTPForbidden(explanation=msg)
            elif share['status'] in constants.TRANSITIONAL_STATUSES:
                msg = _("Share with transitional state can not be unmanaged. "
                        "Share '%(s_id)s' is in '%(state)s' state.") % dict(
                            state=share['status'], s_id=share['id'])
                raise exc.HTTPForbidden(explanation=msg)
            snapshots = self.share_api.db.share_snapshot_get_all_for_share(
                context, id)
            if snapshots:
                msg = _("Share '%(s_id)s' can not be unmanaged because it has "
                        "'%(amount)s' dependent snapshot(s).") % {
                            's_id': id, 'amount': len(snapshots)}
                raise exc.HTTPForbidden(explanation=msg)
            self.share_api.unmanage(context, share)
        except exception.NotFound as e:
            raise exc.HTTPNotFound(explanation=six.text_type(e))
        except (exception.InvalidShare, exception.PolicyNotAuthorized) as e:
            raise exc.HTTPForbidden(explanation=six.text_type(e))

        return webob.Response(status_int=202)
开发者ID:jcsp,项目名称:manila,代码行数:33,代码来源:share_unmanage.py


示例14: store_add_to_backend

def store_add_to_backend(image_id, data, size, store):
    """
    A wrapper around a call to each stores add() method.  This gives glance
    a common place to check the output

    :param image_id:  The image add to which data is added
    :param data: The data to be stored
    :param size: The length of the data in bytes
    :param store: The store to which the data is being added
    :return: The url location of the file,
             the size amount of data,
             the checksum of the data
             the storage systems metadata dictionary for the location
    """
    (location, size, checksum, metadata) = store.add(image_id, data, size)
    if metadata is not None:
        if not isinstance(metadata, dict):
            msg = (_("The storage driver %(store)s returned invalid metadata "
                     "%(metadata)s. This must be a dictionary type") %
                   {'store': six.text_type(store),
                    'metadata': six.text_type(metadata)})
            LOG.error(msg)
            raise BackendException(msg)
        try:
            check_location_metadata(metadata)
        except BackendException as e:
            e_msg = (_("A bad metadata structure was returned from the "
                       "%(store)s storage driver: %(metadata)s.  %(error)s.") %
                     {'store': six.text_type(store),
                      'metadata': six.text_type(metadata),
                      'error': six.text_type(e)})
            LOG.error(e_msg)
            raise BackendException(e_msg)
    return (location, size, checksum, metadata)
开发者ID:joey5678,项目名称:tricircle,代码行数:34,代码来源:__init__.py


示例15: wrapper

 def wrapper(*args, **kwargs):
     try:
         return method(*args, **kwargs)
     except db_exception.DBDuplicateEntry as e:
         # LOG the exception for debug purposes, do not send the
         # exception details out with the raised Conflict exception
         # as it can contain raw SQL.
         LOG.debug(_conflict_msg, {'conflict_type': conflict_type,
                                   'details': six.text_type(e)})
         raise exception.Conflict(type=conflict_type,
                                  details=_('Duplicate Entry'))
     except db_exception.DBError as e:
         # TODO(blk-u): inspecting inner_exception breaks encapsulation;
         # oslo_db should provide exception we need.
         if isinstance(e.inner_exception, IntegrityError):
             # LOG the exception for debug purposes, do not send the
             # exception details out with the raised Conflict exception
             # as it can contain raw SQL.
             LOG.debug(_conflict_msg, {'conflict_type': conflict_type,
                                       'details': six.text_type(e)})
             # NOTE(morganfainberg): This is really a case where the SQL
             # failed to store the data. This is not something that the
             # user has done wrong. Example would be a ForeignKey is
             # missing; the code that is executed before reaching the
             # SQL writing to the DB should catch the issue.
             raise exception.UnexpectedError(
                 _('An unexpected error occurred when trying to '
                   'store %s') % conflict_type)
         raise
开发者ID:gongwayne,项目名称:Openstack,代码行数:29,代码来源:core.py


示例16: _save_and_activate_cfg

 def _save_and_activate_cfg(self, checksum, activate, active_cfg_name):
     body = {"checksum": checksum}
     json_str = json.dumps(body)
     url = self._build_url(rest_constants.PATCH_CFG_SAVE)
     response = self.session.patch(url, data=json_str)
     if response.status_code == 204:
         LOG.info("REST cfg save success")
     else:
         msg = (_("REST cfg save failed: %s")
                % six.text_type(response.text))
         LOG.error(msg)
         raise exception.BrocadeZoningRestException(reason=msg)
     # if activate=true, then enable the cfg changes to effective cfg
     if activate:
         checksum = self._get_checksum()
         body = {"checksum": checksum}
         json_str = json.dumps(body)
         url = self._build_url(rest_constants.PATCH_CFG_ENABLE
                               + active_cfg_name)
         response = self.session.patch(url, data=json_str)
         if response.status_code == 204:
             LOG.info("REST cfg activate success: %s", active_cfg_name)
         else:
             msg = (_("REST cfg activate failed: %s")
                    % six.text_type(response.text))
             LOG.error(msg)
             raise exception.BrocadeZoningRestException(reason=msg)
开发者ID:mahak,项目名称:cinder,代码行数:27,代码来源:brcd_rest_fc_zone_client.py


示例17: _new_session

    def _new_session(self, username_key=None, **attributes):
        """
        Create a new session and persist it according to its username and token
        values.

        :param attributes: Keyword parameters containing zero or more of
            ``username``, ``token``, and ``tenant_id``.  Any fields that are
            not specified will be filled out automatically.

        :return: A new session with all fields filled out and an expiration
                 time 1 day in the future (according to the clock associated
                 with this :obj:`MimicCore`).
        :rtype: :obj:`Session`
        """
        for key in ['username', 'token', 'tenant_id']:
            if attributes.get(key, None) is None:
                attributes[key] = key + "_" + text_type(uuid4())
                if key == 'tenant_id':
                    # integer tenant IDs - uuid4 ints are too long
                    attributes[key] = text_type(int(uuid4().int % 1e15))

        if 'expires' not in attributes:
            attributes['expires'] = (
                datetime.utcfromtimestamp(self.clock.seconds())
                + timedelta(days=1)
            )

        session = Session(**attributes)
        if username_key is None:
            username_key = session.username
        self._username_to_token[username_key] = session.token
        self._token_to_session[session.token] = session
        self._userid_to_session[session.user_id] = session
        self._tenant_to_session[session.tenant_id] = session
        return session
开发者ID:derwolfe,项目名称:mimic,代码行数:35,代码来源:session.py


示例18: _test_create_image_from_server_wait_until_active_not_found

 def _test_create_image_from_server_wait_until_active_not_found(
         self, wait_for_image_status, compute_images_client,
         servers_client, fault=None):
     # setup mocks
     image_id = uuidutils.generate_uuid()
     fake_image = mock.Mock(response={'location': image_id})
     compute_images_client.create_image.return_value = fake_image
     fake_server = {'id': mock.sentinel.server_id}
     if fault:
         fake_server['fault'] = fault
     servers_client.show_server.return_value = {'server': fake_server}
     # call the utility method
     ex = self.assertRaises(
         exceptions.SnapshotNotFoundException,
         compute_base.BaseV2ComputeTest.create_image_from_server,
         mock.sentinel.server_id, wait_until='active')
     # make our assertions
     if fault:
         self.assertIn(fault, six.text_type(ex))
     else:
         self.assertNotIn(fault, six.text_type(ex))
     if compute_base.BaseV2ComputeTest.is_requested_microversion_compatible(
         '2.35'):
         status = 'ACTIVE'
     else:
         status = 'active'
     wait_for_image_status.assert_called_once_with(
         compute_images_client, image_id, status)
     servers_client.show_server.assert_called_once_with(
         mock.sentinel.server_id)
开发者ID:openstack,项目名称:tempest,代码行数:30,代码来源:test_base.py


示例19: encode

    def encode(self):
        """Encode the packet for transmission.

        If the packet contains binary elements, this function returns a list
        of packets where the first is the original packet with placeholders for
        the binary components and the remaining ones the binary attachments.
        """
        encoded_packet = six.text_type(self.packet_type)
        if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
            data, attachments = self._deconstruct_binary(self.data)
            encoded_packet += six.text_type(len(attachments)) + '-'
        else:
            data = self.data
            attachments = None
        needs_comma = False
        if self.namespace is not None and self.namespace != '/':
            encoded_packet += self.namespace
            needs_comma = True
        if self.id is not None:
            if needs_comma:
                encoded_packet += ','
                needs_comma = False
            encoded_packet += six.text_type(self.id)
        if data is not None:
            if needs_comma:
                encoded_packet += ','
            encoded_packet += self.json.dumps(data, separators=(',', ':'))
        if attachments is not None:
            encoded_packet = [encoded_packet] + attachments
        return encoded_packet
开发者ID:fejesd,项目名称:python-socketio,代码行数:30,代码来源:packet.py


示例20: _volume_upload_image

    def _volume_upload_image(self, req, id, body):
        """Uploads the specified volume to image service."""
        context = req.environ['cinder.context']
        params = body['os-volume_upload_image']
        req_version = req.api_version_request

        force = params.get('force', 'False')
        force = strutils.bool_from_string(force, strict=True)

        # Not found exception will be handled at the wsgi level
        volume = self.volume_api.get(context, id)

        context.authorize(policy.UPLOAD_IMAGE_POLICY)
        # check for valid disk-format
        disk_format = params.get("disk_format", "raw")

        image_metadata = {"container_format": params.get(
            "container_format", "bare"),
            "disk_format": disk_format,
            "name": params["image_name"]}

        if volume.encryption_key_id:
            # Clone volume encryption key: the current key cannot
            # be reused because it will be deleted when the volume is
            # deleted.
            # TODO(eharney): Currently, there is no mechanism to remove
            # these keys, because Glance will not delete the key from
            # Barbican when the image is deleted.
            encryption_key_id = self._key_manager.store(
                context,
                self._key_manager.get(context, volume.encryption_key_id))

            image_metadata['cinder_encryption_key_id'] = encryption_key_id

        if req_version >= mv.get_api_version(
                mv.UPLOAD_IMAGE_PARAMS):

            image_metadata['visibility'] = params.get('visibility', 'private')
            image_metadata['protected'] = strutils.bool_from_string(
                params.get('protected', 'False'), strict=True)

            if image_metadata['visibility'] == 'public':
                context.authorize(policy.UPLOAD_PUBLIC_POLICY)

        try:
            response = self.volume_api.copy_volume_to_image(context,
                                                            volume,
                                                            image_metadata,
                                                            force)
        except exception.InvalidVolume as error:
            raise webob.exc.HTTPBadRequest(explanation=error.msg)
        except ValueError as error:
            raise webob.exc.HTTPBadRequest(explanation=six.text_type(error))
        except messaging.RemoteError as error:
            msg = "%(err_type)s: %(err_msg)s" % {'err_type': error.exc_type,
                                                 'err_msg': error.value}
            raise webob.exc.HTTPBadRequest(explanation=msg)
        except Exception as error:
            raise webob.exc.HTTPBadRequest(explanation=six.text_type(error))
        return {'os-volume_upload_image': response}
开发者ID:mahak,项目名称:cinder,代码行数:60,代码来源:volume_actions.py



注:本文中的six.text_type函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python six.u函数代码示例发布时间:2022-05-27
下一篇:
Python six.str函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap