本文整理汇总了Python中nova.cells.rpc_driver.parse_transport_url函数的典型用法代码示例。如果您正苦于以下问题:Python parse_transport_url函数的具体用法?Python parse_transport_url怎么用?Python parse_transport_url使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_transport_url函数的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _fixup_cell_info
def _fixup_cell_info(cell_info, keys):
"""
If the transport_url is present in the cell, derive username,
rpc_host, and rpc_port from it.
"""
if 'transport_url' not in cell_info:
return
# Disassemble the transport URL
transport_url = cell_info.pop('transport_url')
try:
transport = rpc_driver.parse_transport_url(transport_url)
except ValueError:
# Just go with None's
for key in keys:
cell_info.setdefault(key, None)
return cell_info
transport_field_map = {'rpc_host': 'hostname', 'rpc_port': 'port'}
for key in keys:
if key in cell_info:
continue
transport_field = transport_field_map.get(key, key)
cell_info[key] = transport[transport_field]
开发者ID:Charu-Sharma,项目名称:nova,代码行数:26,代码来源:cells.py
示例2: test_normal_ipv6_parsing_no_port
def test_normal_ipv6_parsing_no_port(self):
url = "rabbit://us%65r:p%[email protected][ffff::1]/virtual%5fhost"
result = rpc_driver.parse_transport_url(url)
self.assertEqual(result, {
'username': 'user',
'password': 'pass',
'hostname': 'ffff::1',
'port': None,
'virtual_host': 'virtual_host',
})
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Nova,代码行数:12,代码来源:test_cells_rpc_driver.py
示例3: test_normal_parsing
def test_normal_parsing(self):
url = "rabbit://us%65r:p%[email protected]:10/virtual%5fhost"
result = rpc_driver.parse_transport_url(url)
self.assertEqual(result, {
'username': 'user',
'password': 'pass',
'hostname': 'host.example.com',
'port': 10,
'virtual_host': 'virtual_host',
})
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Nova,代码行数:12,代码来源:test_cells_rpc_driver.py
示例4: test_empty
def test_empty(self):
url = "rabbit:"
result = rpc_driver.parse_transport_url(url)
self.assertEqual(result, {
'username': None,
'password': None,
'hostname': None,
'port': None,
'virtual_host': None,
})
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Nova,代码行数:12,代码来源:test_cells_rpc_driver.py
示例5: get_cell_info
def get_cell_info(self):
"""Return subset of cell information for OS API use."""
db_fields_to_return = ["is_parent", "weight_scale", "weight_offset"]
url_fields_to_return = {"username": "username", "hostname": "rpc_host", "port": "rpc_port"}
cell_info = dict(name=self.name, capabilities=self.capabilities)
if self.db_info:
for field in db_fields_to_return:
cell_info[field] = self.db_info[field]
url_info = rpc_driver.parse_transport_url(self.db_info["transport_url"])
for field, canonical in url_fields_to_return.items():
cell_info[canonical] = url_info[field]
return cell_info
开发者ID:osrg,项目名称:nova,代码行数:13,代码来源:state.py
示例6: get_cell_info
def get_cell_info(self):
"""Return subset of cell information for OS API use."""
db_fields_to_return = ['is_parent', 'weight_scale', 'weight_offset']
url_fields_to_return = {
'username': 'username',
'hostname': 'rpc_host',
'port': 'rpc_port',
}
cell_info = dict(name=self.name, capabilities=self.capabilities)
if self.db_info:
for field in db_fields_to_return:
cell_info[field] = self.db_info[field]
url_info = rpc_driver.parse_transport_url(
self.db_info['transport_url'])
for field, canonical in url_fields_to_return.items():
cell_info[canonical] = url_info[field]
return cell_info
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Nova,代码行数:18,代码来源:state.py
示例7: _normalize_cell
def _normalize_cell(self, cell, existing=None):
"""
Normalize input cell data. Normalizations include:
* Converting cell['type'] to is_parent boolean.
* Merging existing transport URL with transport information.
"""
# Start with the cell type conversion
if 'type' in cell:
self._validate_cell_type(cell['type'])
cell['is_parent'] = cell['type'] == 'parent'
del cell['type']
# Avoid cell type being overwritten to 'child'
elif existing:
cell['is_parent'] = existing['is_parent']
else:
cell['is_parent'] = False
# Now we disassemble the existing transport URL...
transport = {}
if existing and 'transport_url' in existing:
transport = rpc_driver.parse_transport_url(
existing['transport_url'])
# Copy over the input fields
transport_field_map = {
'username': 'username',
'password': 'password',
'hostname': 'rpc_host',
'port': 'rpc_port',
'virtual_host': 'rpc_virtual_host',
}
for key, input_field in transport_field_map.items():
# Set the default value of the field; using setdefault()
# lets us avoid overriding the existing transport URL
transport.setdefault(key, None)
# Only override the value if we're given an override
if input_field in cell:
transport[key] = cell.pop(input_field)
# Now set the transport URL
cell['transport_url'] = rpc_driver.unparse_transport_url(transport)
开发者ID:674009287,项目名称:nova,代码行数:44,代码来源:cells.py
示例8: downgrade_cell_data
def downgrade_cell_data(table):
columns = ['id', 'name', 'transport_url']
query = select([get_column(table, c) for c in columns])
for row in [dict(zip(columns, result)) for result in query.execute()]:
# Disassemble the transport URL
transport_data = {}
try:
transport = rpc_driver.parse_transport_url(row['transport_url'])
for key, value in transport.items():
if not value:
# Ignore empty values
continue
transport_data[field_map.get(key, key)] = value
except ValueError as exc:
# We failed to parse the transport URL, so don't set up
# any transport data
LOG.warning(_('Failed to downgrade cell %(name)s: %(error)s') %
dict(name=row['name'], error=str(exc)))
if transport_data:
table.update().where(table.c.id == row['id']).\
values(**transport_data).execute()
开发者ID:674009287,项目名称:nova,代码行数:23,代码来源:200_add_transport_url_to_cell.py
示例9: insecure_transport_url
def insecure_transport_url(url):
transport = rpc_driver.parse_transport_url(url)
return rpc_driver.unparse_transport_url(transport, False)
开发者ID:LiangShang,项目名称:nova,代码行数:3,代码来源:test_cells.py
注:本文中的nova.cells.rpc_driver.parse_transport_url函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论