本文整理汇总了Python中six.text_type函数的典型用法代码示例。如果您正苦于以下问题:Python text_type函数的具体用法?Python text_type怎么用?Python text_type使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了text_type函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_thread_group_handles_child_exception
def test_thread_group_handles_child_exception(self):
try:
with context.ThreadGroup() as tg:
tg.spawn('raiser1', self._raise_test_exc, 'exc1')
except ex.ThreadException as te:
self.assertIn('exc1', six.text_type(te))
self.assertIn('raiser1', six.text_type(te))
开发者ID:stannie42,项目名称:sahara,代码行数:7,代码来源:test_context.py
示例2: pool_entries
def pool_entries(controller, pool_ctrl, count):
"""Context manager to create several catalogue entries for testing.
The entries are automatically deleted when the context manager
goes out of scope.
:param controller: storage handler
:type controller: queues.storage.base:CatalogueBase
:param count: number of entries to create
:type count: int
:returns: [(project, queue, pool)]
:rtype: [(six.text_type, six.text_type, six.text_type)]
"""
spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i))
for i in range(count)]
for p, q, s in spec:
pool_ctrl.create(s, 100, s)
controller.insert(p, q, s)
yield spec
for p, q, s in spec:
controller.delete(p, q)
pool_ctrl.delete(s)
开发者ID:openstack,项目名称:zaqar,代码行数:25,代码来源:helpers.py
示例3: __init__
def __init__(self, error=None, path=None, message=None,
resource=None):
if path is None:
path = []
elif isinstance(path, six.string_types):
path = [path]
if resource is not None and not path:
path = [resource.stack.t.get_section_name(
resource.stack.t.RESOURCES), resource.name]
if isinstance(error, Exception):
if isinstance(error, StackValidationFailed):
str_error = error.error
message = error.error_message
path = path + error.path
# This is a hack to avoid the py3 (chained exception)
# json serialization circular reference error from
# oslo.messaging.
self.args = error.args
else:
str_error = six.text_type(type(error).__name__)
message = six.text_type(error)
else:
str_error = error
super(StackValidationFailed, self).__init__(error=str_error, path=path,
message=message)
开发者ID:aaratn,项目名称:heat,代码行数:27,代码来源:exception.py
示例4: test_node_handle_exception
def test_node_handle_exception(self, mock_warning):
ex = exception.ResourceStatusError(resource_id='FAKE_ID',
status='FAKE_STATUS',
reason='FAKE_REASON')
node = nodem.Node('node1', self.profile.id, None, self.context)
node.store(self.context)
node._handle_exception(self.context, 'ACTION', 'STATUS', ex)
db_node = db_api.node_get(self.context, node.id)
self.assertEqual(node.ERROR, db_node.status)
self.assertEqual('Profile failed in ACTIOing resource '
'(FAKE_ID) due to: %s' % six.text_type(ex),
db_node.status_reason)
self.assertEqual('FAKE_ID', db_node.physical_id)
mock_warning.assert_called_with(self.context, node, 'ACTION',
'STATUS', six.text_type(ex))
# Exception happens before physical node creation started.
ex = exception.ResourceCreationFailure(rtype='stack',
code=400,
message='Bad request')
node = nodem.Node('node1', self.profile.id, None, self.context)
node.store(self.context)
node._handle_exception(self.context, 'CREATE', 'STATUS', ex)
db_node = db_api.node_get(self.context, node.id)
self.assertEqual(node.ERROR, db_node.status)
self.assertEqual('Profile failed in creating node due to: '
'%s' % six.text_type(ex), db_node.status_reason)
self.assertEqual(None, db_node.physical_id)
mock_warning.assert_called_with(self.context, node, 'CREATE',
'STATUS', six.text_type(ex))
开发者ID:Tennyson53,项目名称:senlin,代码行数:30,代码来源:test_node.py
示例5: create
def create(self, req, body):
"""Creates a new share group snapshot."""
context = req.environ['manila.context']
if not self.is_valid_body(body, 'share_group_snapshot'):
msg = _("'share_group_snapshot' is missing from the request body.")
raise exc.HTTPBadRequest(explanation=msg)
share_group_snapshot = body.get('share_group_snapshot', {})
share_group_id = share_group_snapshot.get('share_group_id')
if not share_group_id:
msg = _("Must supply 'share_group_id' attribute.")
raise exc.HTTPBadRequest(explanation=msg)
if not uuidutils.is_uuid_like(share_group_id):
msg = _("The 'share_group_id' attribute must be a uuid.")
raise exc.HTTPBadRequest(explanation=six.text_type(msg))
kwargs = {"share_group_id": share_group_id}
if 'name' in share_group_snapshot:
kwargs['name'] = share_group_snapshot.get('name')
if 'description' in share_group_snapshot:
kwargs['description'] = share_group_snapshot.get('description')
try:
new_snapshot = self.share_group_api.create_share_group_snapshot(
context, **kwargs)
except exception.ShareGroupNotFound as e:
raise exc.HTTPBadRequest(explanation=six.text_type(e))
except exception.InvalidShareGroup as e:
raise exc.HTTPConflict(explanation=six.text_type(e))
return self._view_builder.detail(req, dict(new_snapshot.items()))
开发者ID:bswartz,项目名称:manila,代码行数:33,代码来源:share_group_snapshots.py
示例6: test_matrix_environments
def test_matrix_environments(tmpdir):
conf = config.Config()
conf.env_dir = six.text_type(tmpdir.join("env"))
conf.pythons = ["2.7", "3.4"]
conf.matrix = {
"six": ["1.4", None],
"colorama": ["0.3.1", "0.3.3"]
}
environments = list(environment.get_environments(conf))
assert len(environments) == 2 * 2 * 2
# Only test the first two environments, since this is so time
# consuming
for env in environments[:2]:
env.create()
output = env.run(
['-c', 'import six, sys; sys.stdout.write(six.__version__)'],
valid_return_codes=None)
if 'six' in env._requirements:
assert output.startswith(six.text_type(env._requirements['six']))
output = env.run(
['-c', 'import colorama, sys; sys.stdout.write(colorama.__version__)'])
assert output.startswith(six.text_type(env._requirements['colorama']))
开发者ID:hcontrast,项目名称:asv,代码行数:28,代码来源:test_environment.py
示例7: __exit__
def __exit__(self, ex_type, ex_value, ex_traceback):
if not ex_value:
return True
if isinstance(ex_value, exception.NotAuthorized):
msg = six.text_type(ex_value)
raise Fault(webob.exc.HTTPForbidden(explanation=msg))
elif isinstance(ex_value, exception.Invalid):
raise Fault(exception.ConvertedException(
code=ex_value.code, explanation=six.text_type(ex_value)))
elif isinstance(ex_value, TypeError):
exc_info = (ex_type, ex_value, ex_traceback)
LOG.error(_(
'Exception handling resource: %s') %
ex_value, exc_info=exc_info)
raise Fault(webob.exc.HTTPBadRequest())
elif isinstance(ex_value, Fault):
LOG.info(_("Fault thrown: %s"), six.text_type(ex_value))
raise ex_value
elif isinstance(ex_value, webob.exc.HTTPException):
LOG.info(_("HTTP exception thrown: %s"), six.text_type(ex_value))
raise Fault(ex_value)
# We didn't handle the exception
return False
开发者ID:grafuls,项目名称:manila,代码行数:25,代码来源:wsgi.py
示例8: test_simple
def test_simple(self):
user = self.create_user()
org = self.create_organization(owner=user)
auth_provider = AuthProvider.objects.create(
organization=org,
provider='dummy',
)
auth_identity = AuthIdentity.objects.create(
auth_provider=auth_provider,
ident=user.email,
user=user,
)
auth = Authenticator.objects.create(
type=available_authenticators(ignore_backup=True)[0].type,
user=user,
)
result = serialize(user, user, DetailedUserSerializer())
assert result['id'] == six.text_type(user.id)
assert result['has2fa'] is True
assert len(result['emails']) == 1
assert result['emails'][0]['email'] == user.email
assert result['emails'][0]['is_verified']
assert 'identities' in result
assert len(result['identities']) == 1
assert result['identities'][0]['id'] == six.text_type(auth_identity.id)
assert result['identities'][0]['name'] == auth_identity.ident
assert 'authenticators' in result
assert len(result['authenticators']) == 1
assert result['authenticators'][0]['id'] == six.text_type(auth.id)
开发者ID:hosmelq,项目名称:sentry,代码行数:32,代码来源:test_user.py
示例9: _get_context
def _get_context(self, networks_per_tenant=2, neutron_network_provider=True):
tenants = {}
for t_id in range(self.TENANTS_AMOUNT):
tenants[six.text_type(t_id)] = {"name": six.text_type(t_id)}
tenants[six.text_type(t_id)]["networks"] = []
for i in range(networks_per_tenant):
network = {"id": "fake_net_id_%s" % i}
if neutron_network_provider:
network["subnets"] = ["fake_subnet_id_of_net_%s" % i]
else:
network["cidr"] = "101.0.5.0/24"
tenants[six.text_type(t_id)]["networks"].append(network)
users = []
for t_id in tenants.keys():
for i in range(self.USERS_PER_TENANT):
users.append({"id": i, "tenant_id": t_id, "credential": "fake"})
context = {
"config": {
"users": {
"tenants": self.TENANTS_AMOUNT,
"users_per_tenant": self.USERS_PER_TENANT,
"random_user_choice": False,
},
consts.SHARE_NETWORKS_CONTEXT_NAME: {"use_share_networks": True, "share_networks": []},
"network": {"networks_per_tenant": networks_per_tenant, "start_cidr": "101.0.5.0/24"},
},
"admin": {"credential": mock.MagicMock()},
"task": mock.MagicMock(),
"users": users,
"tenants": tenants,
}
return context
开发者ID:gluke77,项目名称:rally,代码行数:32,代码来源:test_manila_share_networks.py
示例10: output_param_to_log
def output_param_to_log(self, storage_protocol):
essential_inherited_param = ['volume_backend_name', 'volume_driver']
conf = self.configuration
msg = basic_lib.set_msg(1, config_group=conf.config_group)
LOG.info(msg)
version = self.command.get_comm_version()
if conf.hitachi_unit_name:
prefix = 'HSNM2 version'
else:
prefix = 'RAID Manager version'
LOG.info('\t%-35s%s' % (prefix + ': ', six.text_type(version)))
for param in essential_inherited_param:
value = conf.safe_get(param)
LOG.info('\t%-35s%s' % (param + ': ', six.text_type(value)))
for opt in volume_opts:
if not opt.secret:
value = getattr(conf, opt.name)
LOG.info('\t%-35s%s' % (opt.name + ': ',
six.text_type(value)))
if storage_protocol == 'iSCSI':
value = getattr(conf, 'hitachi_group_request')
LOG.info('\t%-35s%s' % ('hitachi_group_request: ',
six.text_type(value)))
开发者ID:Qeas,项目名称:cinder,代码行数:25,代码来源:hbsd_common.py
示例11: copy_sync_data
def copy_sync_data(self, src_ldev, dest_ldev, size):
src_vol = {'provider_location': six.text_type(src_ldev),
'id': 'src_vol'}
dest_vol = {'provider_location': six.text_type(dest_ldev),
'id': 'dest_vol'}
properties = utils.brick_get_connector_properties()
driver = self.generated_from
src_info = None
dest_info = None
try:
dest_info = driver._attach_volume(self.context, dest_vol,
properties)
src_info = driver._attach_volume(self.context, src_vol,
properties)
volume_utils.copy_volume(src_info['device']['path'],
dest_info['device']['path'], size * 1024,
self.configuration.volume_dd_blocksize)
finally:
if dest_info:
driver._detach_volume(self.context, dest_info,
dest_vol, properties)
if src_info:
driver._detach_volume(self.context, src_info,
src_vol, properties)
self.command.discard_zero_page(dest_ldev)
开发者ID:Qeas,项目名称:cinder,代码行数:25,代码来源:hbsd_common.py
示例12: test_get_network_id_by_label
def test_get_network_id_by_label(self):
"""Tests the get_net_id_by_label function."""
net = mock.MagicMock()
net.id = str(uuid.uuid4())
self.nova_client.networks.find.side_effect = [
net, nova_exceptions.NotFound(404),
nova_exceptions.NoUniqueMatch()]
self.assertEqual(net.id,
self.nova_plugin.get_net_id_by_label('net_label'))
exc = self.assertRaises(
exception.NovaNetworkNotFound,
self.nova_plugin.get_net_id_by_label, 'idontexist')
expected = 'The Nova network (idontexist) could not be found'
self.assertIn(expected, six.text_type(exc))
exc = self.assertRaises(
exception.PhysicalResourceNameAmbiguity,
self.nova_plugin.get_net_id_by_label, 'notUnique')
expected = ('Multiple physical resources were found '
'with name (notUnique)')
self.assertIn(expected, six.text_type(exc))
calls = [mock.call(label='net_label'),
mock.call(label='idontexist'),
mock.call(label='notUnique')]
self.nova_client.networks.find.assert_has_calls(calls)
开发者ID:sizowie,项目名称:heat,代码行数:25,代码来源:test_nova_client.py
示例13: unmanage
def unmanage(self, req, id):
"""Unmanage a share."""
context = req.environ['manila.context']
authorize(context)
LOG.info(_LI("Unmanage share with id: %s"), id, context=context)
try:
share = self.share_api.get(context, id)
if share.get('share_server_id'):
msg = _("Operation 'unmanage' is not supported for shares "
"that are created on top of share servers "
"(created with share-networks).")
raise exc.HTTPForbidden(explanation=msg)
elif share['status'] in constants.TRANSITIONAL_STATUSES:
msg = _("Share with transitional state can not be unmanaged. "
"Share '%(s_id)s' is in '%(state)s' state.") % dict(
state=share['status'], s_id=share['id'])
raise exc.HTTPForbidden(explanation=msg)
snapshots = self.share_api.db.share_snapshot_get_all_for_share(
context, id)
if snapshots:
msg = _("Share '%(s_id)s' can not be unmanaged because it has "
"'%(amount)s' dependent snapshot(s).") % {
's_id': id, 'amount': len(snapshots)}
raise exc.HTTPForbidden(explanation=msg)
self.share_api.unmanage(context, share)
except exception.NotFound as e:
raise exc.HTTPNotFound(explanation=six.text_type(e))
except (exception.InvalidShare, exception.PolicyNotAuthorized) as e:
raise exc.HTTPForbidden(explanation=six.text_type(e))
return webob.Response(status_int=202)
开发者ID:jcsp,项目名称:manila,代码行数:33,代码来源:share_unmanage.py
示例14: store_add_to_backend
def store_add_to_backend(image_id, data, size, store):
"""
A wrapper around a call to each stores add() method. This gives glance
a common place to check the output
:param image_id: The image add to which data is added
:param data: The data to be stored
:param size: The length of the data in bytes
:param store: The store to which the data is being added
:return: The url location of the file,
the size amount of data,
the checksum of the data
the storage systems metadata dictionary for the location
"""
(location, size, checksum, metadata) = store.add(image_id, data, size)
if metadata is not None:
if not isinstance(metadata, dict):
msg = (_("The storage driver %(store)s returned invalid metadata "
"%(metadata)s. This must be a dictionary type") %
{'store': six.text_type(store),
'metadata': six.text_type(metadata)})
LOG.error(msg)
raise BackendException(msg)
try:
check_location_metadata(metadata)
except BackendException as e:
e_msg = (_("A bad metadata structure was returned from the "
"%(store)s storage driver: %(metadata)s. %(error)s.") %
{'store': six.text_type(store),
'metadata': six.text_type(metadata),
'error': six.text_type(e)})
LOG.error(e_msg)
raise BackendException(e_msg)
return (location, size, checksum, metadata)
开发者ID:joey5678,项目名称:tricircle,代码行数:34,代码来源:__init__.py
示例15: wrapper
def wrapper(*args, **kwargs):
try:
return method(*args, **kwargs)
except db_exception.DBDuplicateEntry as e:
# LOG the exception for debug purposes, do not send the
# exception details out with the raised Conflict exception
# as it can contain raw SQL.
LOG.debug(_conflict_msg, {'conflict_type': conflict_type,
'details': six.text_type(e)})
raise exception.Conflict(type=conflict_type,
details=_('Duplicate Entry'))
except db_exception.DBError as e:
# TODO(blk-u): inspecting inner_exception breaks encapsulation;
# oslo_db should provide exception we need.
if isinstance(e.inner_exception, IntegrityError):
# LOG the exception for debug purposes, do not send the
# exception details out with the raised Conflict exception
# as it can contain raw SQL.
LOG.debug(_conflict_msg, {'conflict_type': conflict_type,
'details': six.text_type(e)})
# NOTE(morganfainberg): This is really a case where the SQL
# failed to store the data. This is not something that the
# user has done wrong. Example would be a ForeignKey is
# missing; the code that is executed before reaching the
# SQL writing to the DB should catch the issue.
raise exception.UnexpectedError(
_('An unexpected error occurred when trying to '
'store %s') % conflict_type)
raise
开发者ID:gongwayne,项目名称:Openstack,代码行数:29,代码来源:core.py
示例16: _save_and_activate_cfg
def _save_and_activate_cfg(self, checksum, activate, active_cfg_name):
body = {"checksum": checksum}
json_str = json.dumps(body)
url = self._build_url(rest_constants.PATCH_CFG_SAVE)
response = self.session.patch(url, data=json_str)
if response.status_code == 204:
LOG.info("REST cfg save success")
else:
msg = (_("REST cfg save failed: %s")
% six.text_type(response.text))
LOG.error(msg)
raise exception.BrocadeZoningRestException(reason=msg)
# if activate=true, then enable the cfg changes to effective cfg
if activate:
checksum = self._get_checksum()
body = {"checksum": checksum}
json_str = json.dumps(body)
url = self._build_url(rest_constants.PATCH_CFG_ENABLE
+ active_cfg_name)
response = self.session.patch(url, data=json_str)
if response.status_code == 204:
LOG.info("REST cfg activate success: %s", active_cfg_name)
else:
msg = (_("REST cfg activate failed: %s")
% six.text_type(response.text))
LOG.error(msg)
raise exception.BrocadeZoningRestException(reason=msg)
开发者ID:mahak,项目名称:cinder,代码行数:27,代码来源:brcd_rest_fc_zone_client.py
示例17: _new_session
def _new_session(self, username_key=None, **attributes):
"""
Create a new session and persist it according to its username and token
values.
:param attributes: Keyword parameters containing zero or more of
``username``, ``token``, and ``tenant_id``. Any fields that are
not specified will be filled out automatically.
:return: A new session with all fields filled out and an expiration
time 1 day in the future (according to the clock associated
with this :obj:`MimicCore`).
:rtype: :obj:`Session`
"""
for key in ['username', 'token', 'tenant_id']:
if attributes.get(key, None) is None:
attributes[key] = key + "_" + text_type(uuid4())
if key == 'tenant_id':
# integer tenant IDs - uuid4 ints are too long
attributes[key] = text_type(int(uuid4().int % 1e15))
if 'expires' not in attributes:
attributes['expires'] = (
datetime.utcfromtimestamp(self.clock.seconds())
+ timedelta(days=1)
)
session = Session(**attributes)
if username_key is None:
username_key = session.username
self._username_to_token[username_key] = session.token
self._token_to_session[session.token] = session
self._userid_to_session[session.user_id] = session
self._tenant_to_session[session.tenant_id] = session
return session
开发者ID:derwolfe,项目名称:mimic,代码行数:35,代码来源:session.py
示例18: _test_create_image_from_server_wait_until_active_not_found
def _test_create_image_from_server_wait_until_active_not_found(
self, wait_for_image_status, compute_images_client,
servers_client, fault=None):
# setup mocks
image_id = uuidutils.generate_uuid()
fake_image = mock.Mock(response={'location': image_id})
compute_images_client.create_image.return_value = fake_image
fake_server = {'id': mock.sentinel.server_id}
if fault:
fake_server['fault'] = fault
servers_client.show_server.return_value = {'server': fake_server}
# call the utility method
ex = self.assertRaises(
exceptions.SnapshotNotFoundException,
compute_base.BaseV2ComputeTest.create_image_from_server,
mock.sentinel.server_id, wait_until='active')
# make our assertions
if fault:
self.assertIn(fault, six.text_type(ex))
else:
self.assertNotIn(fault, six.text_type(ex))
if compute_base.BaseV2ComputeTest.is_requested_microversion_compatible(
'2.35'):
status = 'ACTIVE'
else:
status = 'active'
wait_for_image_status.assert_called_once_with(
compute_images_client, image_id, status)
servers_client.show_server.assert_called_once_with(
mock.sentinel.server_id)
开发者ID:openstack,项目名称:tempest,代码行数:30,代码来源:test_base.py
示例19: encode
def encode(self):
"""Encode the packet for transmission.
If the packet contains binary elements, this function returns a list
of packets where the first is the original packet with placeholders for
the binary components and the remaining ones the binary attachments.
"""
encoded_packet = six.text_type(self.packet_type)
if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
data, attachments = self._deconstruct_binary(self.data)
encoded_packet += six.text_type(len(attachments)) + '-'
else:
data = self.data
attachments = None
needs_comma = False
if self.namespace is not None and self.namespace != '/':
encoded_packet += self.namespace
needs_comma = True
if self.id is not None:
if needs_comma:
encoded_packet += ','
needs_comma = False
encoded_packet += six.text_type(self.id)
if data is not None:
if needs_comma:
encoded_packet += ','
encoded_packet += self.json.dumps(data, separators=(',', ':'))
if attachments is not None:
encoded_packet = [encoded_packet] + attachments
return encoded_packet
开发者ID:fejesd,项目名称:python-socketio,代码行数:30,代码来源:packet.py
示例20: _volume_upload_image
def _volume_upload_image(self, req, id, body):
"""Uploads the specified volume to image service."""
context = req.environ['cinder.context']
params = body['os-volume_upload_image']
req_version = req.api_version_request
force = params.get('force', 'False')
force = strutils.bool_from_string(force, strict=True)
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, id)
context.authorize(policy.UPLOAD_IMAGE_POLICY)
# check for valid disk-format
disk_format = params.get("disk_format", "raw")
image_metadata = {"container_format": params.get(
"container_format", "bare"),
"disk_format": disk_format,
"name": params["image_name"]}
if volume.encryption_key_id:
# Clone volume encryption key: the current key cannot
# be reused because it will be deleted when the volume is
# deleted.
# TODO(eharney): Currently, there is no mechanism to remove
# these keys, because Glance will not delete the key from
# Barbican when the image is deleted.
encryption_key_id = self._key_manager.store(
context,
self._key_manager.get(context, volume.encryption_key_id))
image_metadata['cinder_encryption_key_id'] = encryption_key_id
if req_version >= mv.get_api_version(
mv.UPLOAD_IMAGE_PARAMS):
image_metadata['visibility'] = params.get('visibility', 'private')
image_metadata['protected'] = strutils.bool_from_string(
params.get('protected', 'False'), strict=True)
if image_metadata['visibility'] == 'public':
context.authorize(policy.UPLOAD_PUBLIC_POLICY)
try:
response = self.volume_api.copy_volume_to_image(context,
volume,
image_metadata,
force)
except exception.InvalidVolume as error:
raise webob.exc.HTTPBadRequest(explanation=error.msg)
except ValueError as error:
raise webob.exc.HTTPBadRequest(explanation=six.text_type(error))
except messaging.RemoteError as error:
msg = "%(err_type)s: %(err_msg)s" % {'err_type': error.exc_type,
'err_msg': error.value}
raise webob.exc.HTTPBadRequest(explanation=msg)
except Exception as error:
raise webob.exc.HTTPBadRequest(explanation=six.text_type(error))
return {'os-volume_upload_image': response}
开发者ID:mahak,项目名称:cinder,代码行数:60,代码来源:volume_actions.py
注:本文中的six.text_type函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论