本文整理汇总了Python中nova.context.get_admin_context函数的典型用法代码示例。如果您正苦于以下问题:Python get_admin_context函数的具体用法?Python get_admin_context怎么用?Python get_admin_context使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_admin_context函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_public_network_association
def test_public_network_association(self):
"""Makes sure that we can allocaate a public ip"""
# TODO(vish): better way of adding floating ips
self.context._project = self.projects[0]
self.context.project_id = self.projects[0].id
pubnet = IPy.IP(flags.FLAGS.floating_range)
address = str(pubnet[0])
try:
db.floating_ip_get_by_address(context.get_admin_context(), address)
except exception.NotFound:
db.floating_ip_create(context.get_admin_context(),
{'address': address,
'host': FLAGS.host})
float_addr = self.network.allocate_floating_ip(self.context,
self.projects[0].id)
fix_addr = self._create_address(0)
lease_ip(fix_addr)
self.assertEqual(float_addr, str(pubnet[0]))
self.network.associate_floating_ip(self.context, float_addr, fix_addr)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, float_addr)
self.network.disassociate_floating_ip(self.context, float_addr)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, None)
self.network.deallocate_floating_ip(self.context, float_addr)
self.network.deallocate_fixed_ip(self.context, fix_addr)
release_ip(fix_addr)
db.floating_ip_destroy(context.get_admin_context(), float_addr)
开发者ID:anotherjesse,项目名称:nova,代码行数:30,代码来源:test_network.py
示例2: _set_machine_id
def _set_machine_id(self, client_factory, instance):
"""
Set the machine id of the VM for guest tools to pick up and change
the IP.
"""
vm_ref = self._get_vm_ref_from_the_name(instance.name)
if vm_ref is None:
raise exception.InstanceNotFound(instance_id=instance.id)
network = db.network_get_by_instance(context.get_admin_context(),
instance['id'])
mac_addr = instance.mac_address
net_mask = network["netmask"]
gateway = network["gateway"]
ip_addr = db.instance_get_fixed_address(context.get_admin_context(),
instance['id'])
machine_id_chanfge_spec = \
vm_util.get_machine_id_change_spec(client_factory, mac_addr,
ip_addr, net_mask, gateway)
LOG.debug(_("Reconfiguring VM instance %(name)s to set the machine id "
"with ip - %(ip_addr)s") %
({'name': instance.name,
'ip_addr': ip_addr}))
reconfig_task = self._session._call_method(self._session._get_vim(),
"ReconfigVM_Task", vm_ref,
spec=machine_id_chanfge_spec)
self._session._wait_for_task(instance.id, reconfig_task)
LOG.debug(_("Reconfigured VM instance %(name)s to set the machine id "
"with ip - %(ip_addr)s") %
({'name': instance.name,
'ip_addr': ip_addr}))
开发者ID:superstack,项目名称:nova,代码行数:30,代码来源:vmops.py
示例3: list_vms
def list_vms(host=None):
"""
make a list of vms and expand out their fixed_ip and floating ips sensibly
"""
flags.parse_args([])
my_instances = []
if host is None:
instances = db.instance_get_all(context.get_admin_context())
else:
instances = db.instance_get_all_by_host(
context.get_admin_context(), host)
for instance in instances:
my_inst = {}
my_inst = dict(instance).copy()
for (k,v) in my_inst.items():
try:
json.encoder(v)
except TypeError, e:
v = str(v)
my_inst[k] = v
ec2_id = db.get_ec2_instance_id_by_uuid(context.get_admin_context(), instance.uuid)
ec2_id = 'i-' + hex(int(ec2_id)).replace('0x', '').zfill(8)
my_inst['ec2_id'] = ec2_id
try:
fixed_ips = db.fixed_ip_get_by_instance(context.get_admin_context(), instance.uuid)
except:
pass
my_inst['fixed_ips'] = [ ip.address for ip in fixed_ips ]
my_inst['floating_ips'] = []
for ip in fixed_ips:
my_inst['floating_ips'].extend([ f_ip.address for f_ip in db.floating_ip_get_by_fixed_address(context.get_admin_context(), ip.address)])
my_instances.append(my_inst)
开发者ID:tyll,项目名称:fedora-infrastructure,代码行数:35,代码来源:gather-diff-instances.py
示例4: set_instance_error_state_and_notify
def set_instance_error_state_and_notify(instance):
"""
Set an instance to ERROR state and send out a notification
"""
# set instance to error state. Instance
# could be a dictionary when this method is called during
# virtual machine delete process.
instance['vm_state'] = vm_states.ERROR
conductor.API().instance_update(
context.get_admin_context(), instance['uuid'],
vm_state=vm_states.ERROR,
task_state=None)
instance_name = instance['name']
host_name = instance['host']
LOG.warn(_('Unable to find virtual machine %(inst_name)s '
'on host %(host)s. Set state to ERROR')
% {'inst_name': instance_name,
'host': host_name})
# Send event notification
note = {'event_type': 'compute.instance.log',
'msg': _('Unable to find virtual machine {instance_name} on '
'host {host_name}. An operation might have been '
'performed on the virtual machine outside of PowerVC or'
' the deploy of the virtual machine failed.'
'The virtual machine is now set to Error state in the '
'database.'),
'instance_name': instance_name,
'host_name': host_name}
notifier = rpc.get_notifier(service='compute', host=host_name)
notifier.warn(context.get_admin_context(), 'compute.instance.log',
note)
开发者ID:windskyer,项目名称:k_nova,代码行数:31,代码来源:__init__.py
示例5: test_too_many_addresses
def test_too_many_addresses(self):
"""Test for a NoMoreAddresses exception when all fixed ips are used.
"""
admin_context = context.get_admin_context()
network = db.project_get_network(admin_context, self.projects[0].id)
num_available_ips = db.network_count_available_ips(admin_context,
network['id'])
addresses = []
instance_ids = []
for i in range(num_available_ips):
instance_ref = self._create_instance(0)
instance_ids.append(instance_ref['id'])
address = self._create_address(0, instance_ref['id'])
addresses.append(address)
lease_ip(address)
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, 0)
self.assertRaises(db.NoMoreAddresses,
self.network.allocate_fixed_ip,
self.context,
'foo')
for i in range(num_available_ips):
self.network.deallocate_fixed_ip(self.context, addresses[i])
release_ip(addresses[i])
db.instance_destroy(context.get_admin_context(), instance_ids[i])
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, num_available_ips)
开发者ID:anotherjesse,项目名称:nova,代码行数:31,代码来源:test_network.py
示例6: add_role
def add_role(self, uid, role, project_id=None):
"""Add role for user (or user and project)"""
if not project_id:
db.user_add_role(context.get_admin_context(), uid, role)
return
db.user_add_project_role(context.get_admin_context(),
uid, project_id, role)
开发者ID:AsherBond,项目名称:dodai-compute,代码行数:7,代码来源:dbdriver.py
示例7: _initCache
def _initCache(self):
# Read from DB all the vmHost objects and populate
# the cache for each IP if cache is empty
LOG.info(_('Entering into initCache'))
computes = db.compute_node_get_all(get_admin_context())
for compute in computes:
compute_id = str(compute['id'])
service = compute['service']
self._add_compute_to_inventory(compute[
'hypervisor_type'],
compute_id, service['host'])
vmhosts = api.vm_host_get_all(get_admin_context())
vms = api.vm_get_all(get_admin_context())
storageVolumes = api.storage_volume_get_all(get_admin_context())
subNets = api.subnet_get_all(get_admin_context())
self._updateInventory(vmhosts)
self._updateInventory(vms)
self._updateInventory(storageVolumes)
self._updateInventory(subNets)
LOG.info(_('Hosts obtained from db: %s') % str(len(vmhosts)))
LOG.info(_('Vms obtained from db: %s') % str(len(vms)))
LOG.info(_('Storage volumes obtained from db: %s') %
str(len(storageVolumes)))
LOG.info(_('Subnets obtained from db: %s') % str(len(subNets)))
LOG.info(_('Completed the initCache method'))
开发者ID:jessegonzalez,项目名称:healthnmon,代码行数:30,代码来源:inventory_manager.py
示例8: setUp
def setUp(self):
super(CloudpipeTest, self).setUp()
self.flags(allow_admin_api=True)
self.app = fakes.wsgi_app()
inner_app = v2.APIRouter()
adm_ctxt = context.get_admin_context()
self.app = auth.InjectContext(adm_ctxt, inner_app)
route = inner_app.map.match('/1234/os-cloudpipe')
self.controller = route['controller'].controller
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
self.stubs.Set(db, "instance_get_all_by_project",
db_instance_get_all_by_project)
self.stubs.Set(db, "security_group_exists",
db_security_group_exists)
self.stubs.SmartSet(self.controller.cloudpipe, "launch_vpn_instance",
pipelib_launch_vpn_instance)
#self.stubs.SmartSet(self.controller.auth_manager, "get_project",
# auth_manager_get_project)
#self.stubs.SmartSet(self.controller.auth_manager, "get_projects",
# auth_manager_get_projects)
# NOTE(todd): The above code (just setting the stub, not invoking it)
# causes failures in AuthManagerLdapTestCase. So use a fake object.
self.controller.auth_manager = FakeAuthManager()
self.stubs.Set(utils, 'vpn_ping', utils_vpn_ping)
self.context = context.get_admin_context()
global EMPTY_INSTANCE_LIST
EMPTY_INSTANCE_LIST = True
开发者ID:bgh,项目名称:nova,代码行数:28,代码来源:test_cloudpipe.py
示例9: test_timestamp_columns
def test_timestamp_columns(self):
"""
Test the time stamp columns createEpoch,
modifiedEpoch and deletedEpoch
"""
portGrp = PortGroup()
portGrp.set_id('portGrp-01')
# Check for createEpoch
epoch_before = utils.get_current_epoch_ms()
api.port_group_save(get_admin_context(), portGrp)
epoch_after = utils.get_current_epoch_ms()
portGrp_queried = api.port_group_get_by_ids(
get_admin_context(), [portGrp.get_id()])[0]
self.assert_(test_utils.is_timestamp_between(
epoch_before, epoch_after, portGrp_queried.get_createEpoch()))
# Check for lastModifiedEpoch
portGrp_modified = portGrp_queried
test_utils.unset_timestamp_fields(portGrp_modified)
portGrp_modified.set_name('changed_name')
epoch_before = utils.get_current_epoch_ms()
api.port_group_save(get_admin_context(), portGrp_modified)
epoch_after = utils.get_current_epoch_ms()
portGrp_queried = api.port_group_get_by_ids(
get_admin_context(), [portGrp.get_id()])[0]
self.assert_(portGrp_modified.get_createEpoch(
) == portGrp_queried.get_createEpoch())
self.assert_(test_utils.is_timestamp_between(
epoch_before,
epoch_after,
portGrp_queried.get_lastModifiedEpoch()))
开发者ID:rakrup,项目名称:healthnmon,代码行数:30,代码来源:test_port_group_db_api.py
示例10: remove_role
def remove_role(self, uid, role, project_id=None):
"""Remove role for user (or user and project)"""
if not project_id:
db.user_remove_role(context.get_admin_context(), uid, role)
return
db.user_remove_project_role(context.get_admin_context(),
uid, project_id, role)
开发者ID:AsherBond,项目名称:dodai-compute,代码行数:7,代码来源:dbdriver.py
示例11: test_vm_save
def test_vm_save(self):
'''
Insert a vm object into db and check
whether we are getting proper values after retrieval
'''
vm = Vm()
vm.id = 'VM1-id'
vm.name = 'VM1-Name'
vmScsiController = VmScsiController()
vmScsiController.set_id('VM_CTRL_1')
vmScsiController.set_id('some_type')
vm.add_vmScsiControllers(vmScsiController)
healthnmon_db_api.vm_save(get_admin_context(), vm)
vms = healthnmon_db_api.vm_get_by_ids(get_admin_context(), [vm.id])
self.assertTrue(vms is not None)
self.assertTrue(len(vms) == 1)
self.assertEqual(vms[0].get_id(), 'VM1-id', "VM id is not same")
self.assertEqual(vms[0].get_name(), 'VM1-Name', "VM name is not same")
self.assert_(len(vms[0].get_vmScsiControllers(
)) == 1, "vmScsiController len mismatch")
self.assert_(vms[0].get_vmScsiControllers()[0].get_id(
) == vmScsiController.get_id(), "vmScsiController id mismatch")
self.assert_(vms[0].get_vmScsiControllers()[0].get_type() ==
vmScsiController.get_type(),
"vmScsiController type mismatch")
开发者ID:jessegonzalez,项目名称:healthnmon,代码行数:26,代码来源:test_vm_db_api.py
示例12: test_vm_netadpater_save
def test_vm_netadpater_save(self):
vm = Vm()
vm.id = 'VM1'
vmNetAdapter = VmNetAdapter()
vmNetAdapter.set_id('netAdapter-01')
vmNetAdapter.set_name('netAdapter-01')
vmNetAdapter.set_addressType('assigned')
vmNetAdapter.set_adapterType('E1000')
vmNetAdapter.set_switchType('vSwitch')
vmNetAdapter.set_macAddress('00:50:56:81:1c:d0')
vmNetAdapter.add_ipAddresses('1.1.1.1')
vmNetAdapter.set_networkName('br100')
vmNetAdapter.set_vlanId(0)
vm.add_vmNetAdapters(vmNetAdapter)
healthnmon_db_api.vm_save(get_admin_context(), vm)
virual_machines = \
healthnmon_db_api.vm_get_by_ids(get_admin_context(), ['VM1'
])
vm_from_db = virual_machines[0]
netAdapters = vm_from_db.get_vmNetAdapters()
netAdapter = netAdapters[0]
self.assertTrue(vmNetAdapter.get_id() == netAdapter.get_id())
healthnmon_db_api.vm_delete_by_ids(get_admin_context(), [vm.id])
vms = healthnmon_db_api.vm_get_by_ids(get_admin_context(),
[vm.id])
self.assertTrue(vms is None or len(vms) == 0, 'VM not deleted')
开发者ID:jessegonzalez,项目名称:healthnmon,代码行数:28,代码来源:test_vm_db_api.py
示例13: test_vm_host_get_all_for_vm
def test_vm_host_get_all_for_vm(self):
host_id = 'VH1'
vmhost = VmHost()
vmhost.id = host_id
healthnmon_db_api.vm_host_save(get_admin_context(), vmhost)
vm = Vm()
vm.id = 'VM11'
vm.set_vmHostId(host_id)
healthnmon_db_api.vm_save(get_admin_context(), vm)
vmhosts = \
healthnmon_db_api.vm_host_get_all(get_admin_context())
self.assertFalse(vmhosts is None,
'Host get by id returned a none list')
self.assertTrue(len(vmhosts) > 0,
'Host get by id returned invalid number of list'
)
self.assertTrue(vmhosts[0].id == host_id)
vmids = vmhosts[0].get_virtualMachineIds()
self.assert_(vmids is not None)
self.assert_(len(vmids) == 1)
self.assert_(vm.id in vmids)
healthnmon_db_api.vm_delete_by_ids(get_admin_context(), [vm.id])
vmhosts = \
healthnmon_db_api.vm_host_get_all(get_admin_context())
self.assertTrue(vmhosts[0].id == host_id)
vmids = vmhosts[0].get_virtualMachineIds()
self.assert_((vmids is None) or (len(vmids) == 0))
开发者ID:rakrup,项目名称:healthnmon,代码行数:27,代码来源:test_vm_host_db_api.py
示例14: test_vm_host_get_all_for_sv
def test_vm_host_get_all_for_sv(self):
host_id = 'VH1'
vmhost = VmHost()
vmhost.id = host_id
healthnmon_db_api.vm_host_save(get_admin_context(), vmhost)
mntPnt = HostMountPoint()
mntPnt.set_vmHostId(host_id)
mntPnt.set_path('/path')
volume = StorageVolume()
volume.set_id('SV11')
volume.add_mountPoints(mntPnt)
healthnmon_db_api.storage_volume_save(get_admin_context(),
volume)
vmhosts = \
healthnmon_db_api.vm_host_get_all(get_admin_context())
self.assertFalse(vmhosts is None,
'Host get by id returned a none list')
self.assertTrue(len(vmhosts) > 0,
'Host get by id returned invalid number of list'
)
self.assertTrue(vmhosts[0].id == host_id)
svlist = vmhosts[0].get_storageVolumeIds()
self.assert_(svlist is not None)
self.assert_(len(svlist) == 1)
self.assert_(volume.get_id() in svlist)
healthnmon_db_api.storage_volume_delete_by_ids(
get_admin_context(), [volume.get_id()])
vmhosts = \
healthnmon_db_api.vm_host_get_all(get_admin_context())
self.assertTrue(vmhosts[0].id == host_id)
svids = vmhosts[0].get_storageVolumeIds()
self.assert_((svids is None) or (len(svids) == 0))
开发者ID:rakrup,项目名称:healthnmon,代码行数:34,代码来源:test_vm_host_db_api.py
示例15: init_host
def init_host(self):
# Initialize general L3 networking
self.l3driver.initialize()
super(QuantumManager, self).init_host()
# Initialize floating ip support (only works for nova ipam currently)
if FLAGS.quantum_ipam_lib == 'nova.network.quantum.nova_ipam_lib':
LOG.debug("Initializing FloatingIP support")
self.init_host_floating_ips()
# Set up all the forwarding rules for any network that has a
# gateway set.
networks = self.get_all_networks(context.get_admin_context())
cidrs = []
for net in networks:
# Don't update host information for network that does not
# belong to you
if net['host'] != self.host:
continue
if net['gateway']:
LOG.debug("Initializing NAT: %s (cidr: %s, gw: %s)" % (
net['label'], net['cidr'], net['gateway']))
cidrs.append(net['cidr'])
self._update_network_host(context.get_admin_context(),
net['uuid'])
# .. and for each network
for c in cidrs:
self.l3driver.initialize_network(c)
开发者ID:singn,项目名称:nova,代码行数:26,代码来源:manager.py
示例16: test_vm_host_get_by_id
def test_vm_host_get_by_id(self):
host_id = 'VH1'
vmhost = VmHost()
vmhost.id = host_id
healthnmon_db_api.vm_host_save(get_admin_context(), vmhost)
vm = Vm()
vm.id = 'VM11'
vm.set_vmHostId(host_id)
healthnmon_db_api.vm_save(get_admin_context(), vm)
mntPnt = HostMountPoint()
mntPnt.set_vmHostId(host_id)
mntPnt.set_path('/path')
volume = StorageVolume()
volume.set_id('SV11')
volume.add_mountPoints(mntPnt)
healthnmon_db_api.storage_volume_save(get_admin_context(),
volume)
vmhosts = \
healthnmon_db_api.vm_host_get_by_ids(get_admin_context(),
[host_id])
self.assertFalse(vmhosts is None,
'Host get by id returned a none list')
self.assertTrue(len(vmhosts) > 0,
'Host get by id returned invalid number of list'
)
self.assertTrue(vmhosts[0].id == host_id)
开发者ID:rakrup,项目名称:healthnmon,代码行数:27,代码来源:test_vm_host_db_api.py
示例17: test_port_group_delete
def test_port_group_delete(self):
portgroup = PortGroup()
portgroup.id = 'PG1'
portgroup.name = 'test'
portgroup.note = 'note'
from decimal import Decimal
portgroup.value = Decimal('123.00')
portgroup.units = 'uni'
portgroup.resourceManagerId = 'rm1'
portgroup.type = 'port'
portgroup.virtualSwitchId = 'VS1'
portgroup.vmHostId = 'VM1'
api.port_group_save(get_admin_context(), portgroup)
pgs = api.port_group_get_by_ids(get_admin_context(),
[portgroup.id])
self.assertFalse(len(pgs) == 0, 'Portgroup could not be saved')
api.port_group_delete_by_ids(get_admin_context(),
[portgroup.id])
portgroups = api.port_group_get_by_ids(get_admin_context(),
[portgroup.id])
self.assertTrue(portgroups is None or len(portgroups) == 0,
'port group not deleted')
开发者ID:rakrup,项目名称:healthnmon,代码行数:25,代码来源:test_port_group_db_api.py
示例18: get_projects
def get_projects(self, uid=None):
"""Retrieve list of projects"""
if uid:
result = db.project_get_by_user(context.get_admin_context(), uid)
else:
result = db.project_get_all(context.get_admin_context())
return [self._db_project_to_auth_projectuser(proj) for proj in result]
开发者ID:AsherBond,项目名称:dodai-compute,代码行数:7,代码来源:dbdriver.py
示例19: test_vm_save_update
def test_vm_save_update(self):
'''
Update an existing object in db
'''
vm = Vm()
vm.id = 'VM1-id'
healthnmon_db_api.vm_save(get_admin_context(), vm)
vmGlobalSettings = VmGlobalSettings()
vmGlobalSettings.set_id(vm.id)
vmGlobalSettings.set_autoStartAction('autoStartAction')
vmGlobalSettings.set_autoStopAction('autoStopAction')
vm.set_vmGlobalSettings(vmGlobalSettings)
healthnmon_db_api.vm_save(get_admin_context(), vm)
vms = healthnmon_db_api.vm_get_by_ids(get_admin_context(), [vm.id])
self.assertTrue(vms is not None)
self.assertTrue(len(vms) == 1)
vm = vms[0]
self.assertEqual(vm.get_id(), 'VM1-id', "VM id is not same")
vmGlobalSets = vm.get_vmGlobalSettings()
self.assertTrue(vmGlobalSets is not None)
self.assertEqual(vmGlobalSets.get_id(), 'VM1-id', "VM id is not same")
self.assertEqual(vmGlobalSets.get_autoStartAction(),
'autoStartAction', "autoStartAction is not same")
self.assertEqual(vmGlobalSets.get_autoStopAction(),
'autoStopAction', "autoStopAction is not same")
开发者ID:jessegonzalez,项目名称:healthnmon,代码行数:27,代码来源:test_vm_db_api.py
示例20: setUp
def setUp(self):
super(FloatingIpsBulkTest, self).setUp()
pool = CONF.default_floating_pool
interface = CONF.public_interface
self.ip_pool = [
{
'address': "10.10.10.1",
'pool': pool,
'interface': interface,
'host': None
},
{
'address': "10.10.10.2",
'pool': pool,
'interface': interface,
'host': None
},
{
'address': "10.10.10.3",
'pool': pool,
'interface': interface,
'host': "testHost"
},
]
self.compute.db.floating_ip_bulk_create(
context.get_admin_context(), self.ip_pool)
self.addCleanup(self.compute.db.floating_ip_bulk_destroy,
context.get_admin_context(), self.ip_pool)
开发者ID:hanbaoying,项目名称:nova,代码行数:30,代码来源:test_floating_ips_bulk.py
注:本文中的nova.context.get_admin_context函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论