本文整理汇总了Python中nova.db.fixed_ip_create函数的典型用法代码示例。如果您正苦于以下问题:Python fixed_ip_create函数的具体用法?Python fixed_ip_create怎么用?Python fixed_ip_create使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fixed_ip_create函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _timeout_test
def _timeout_test(self, ctxt, timeout, multi_host):
values = {'host': 'foo'}
instance = db.instance_create(ctxt, values)
values = {'multi_host': multi_host, 'host': 'bar'}
net = db.network_create_safe(ctxt, values)
old = time = timeout - datetime.timedelta(seconds=5)
new = time = timeout + datetime.timedelta(seconds=5)
# should deallocate
values = {'allocated': False,
'instance_id': instance['id'],
'network_id': net['id'],
'updated_at': old}
db.fixed_ip_create(ctxt, values)
# still allocated
values = {'allocated': True,
'instance_id': instance['id'],
'network_id': net['id'],
'updated_at': old}
db.fixed_ip_create(ctxt, values)
# wrong network
values = {'allocated': False,
'instance_id': instance['id'],
'network_id': None,
'updated_at': old}
db.fixed_ip_create(ctxt, values)
# too new
values = {'allocated': False,
'instance_id': instance['id'],
'network_id': None,
'updated_at': new}
db.fixed_ip_create(ctxt, values)
开发者ID:matiu2,项目名称:nova,代码行数:31,代码来源:test_db_api.py
示例2: test_network_delete_safe
def test_network_delete_safe(self):
ctxt = context.get_admin_context()
values = {"host": "localhost", "project_id": "project1"}
network = db.network_create_safe(ctxt, values)
db_network = db.network_get(ctxt, network.id)
values = {"network_id": network["id"], "address": "fake1"}
address1 = db.fixed_ip_create(ctxt, values)
values = {"network_id": network["id"], "address": "fake2", "allocated": True}
address2 = db.fixed_ip_create(ctxt, values)
self.assertRaises(exception.NetworkInUse, db.network_delete_safe, ctxt, network["id"])
db.fixed_ip_update(ctxt, address2, {"allocated": False})
network = db.network_delete_safe(ctxt, network["id"])
ctxt = ctxt.elevated(read_deleted="yes")
fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
self.assertTrue(fixed_ip["deleted"])
开发者ID:hiteshwadekar,项目名称:nova,代码行数:15,代码来源:test_db_api.py
示例3: test_network_get_associated_fixed_ips
def test_network_get_associated_fixed_ips(self):
ctxt = context.get_admin_context()
values = {"host": "foo", "hostname": "myname"}
instance = db.instance_create(ctxt, values)
values = {"address": "bar", "instance_id": instance["id"]}
vif = db.virtual_interface_create(ctxt, values)
values = {
"address": "baz",
"network_id": 1,
"allocated": True,
"instance_id": instance["id"],
"virtual_interface_id": vif["id"],
}
fixed_address = db.fixed_ip_create(ctxt, values)
data = db.network_get_associated_fixed_ips(ctxt, 1)
self.assertEqual(len(data), 1)
record = data[0]
self.assertEqual(record["address"], fixed_address)
self.assertEqual(record["instance_id"], instance["id"])
self.assertEqual(record["network_id"], 1)
self.assertEqual(record["instance_created"], instance["created_at"])
self.assertEqual(record["instance_updated"], instance["updated_at"])
self.assertEqual(record["instance_hostname"], instance["hostname"])
self.assertEqual(record["vif_id"], vif["id"])
self.assertEqual(record["vif_address"], vif["address"])
data = db.network_get_associated_fixed_ips(ctxt, 1, "nothing")
self.assertEqual(len(data), 0)
开发者ID:hiteshwadekar,项目名称:nova,代码行数:27,代码来源:test_db_api.py
示例4: test_network_get_associated_fixed_ips
def test_network_get_associated_fixed_ips(self):
ctxt = context.get_admin_context()
values = {'host': 'foo', 'hostname': 'myname'}
instance = db.instance_create(ctxt, values)
values = {'address': 'bar', 'instance_id': instance['id']}
vif = db.virtual_interface_create(ctxt, values)
values = {'address': 'baz',
'network_id': 1,
'allocated': True,
'instance_id': instance['id'],
'virtual_interface_id': vif['id']}
fixed_address = db.fixed_ip_create(ctxt, values)
data = db.network_get_associated_fixed_ips(ctxt, 1)
self.assertEqual(len(data), 1)
record = data[0]
self.assertEqual(record['address'], fixed_address)
self.assertEqual(record['instance_id'], instance['id'])
self.assertEqual(record['network_id'], 1)
self.assertEqual(record['instance_created'], instance['created_at'])
self.assertEqual(record['instance_updated'], instance['updated_at'])
self.assertEqual(record['instance_hostname'], instance['hostname'])
self.assertEqual(record['vif_id'], vif['id'])
self.assertEqual(record['vif_address'], vif['address'])
data = db.network_get_associated_fixed_ips(ctxt, 1, 'nothing')
self.assertEqual(len(data), 0)
开发者ID:matiu2,项目名称:nova,代码行数:25,代码来源:test_db_api.py
示例5: _setup_networking
def _setup_networking(instance_id, ip="1.2.3.4", flo_addr="1.2.1.2"):
ctxt = context.get_admin_context()
network_ref = db.project_get_networks(ctxt, "fake", associate=True)[0]
vif = {"address": "56:12:12:12:12:12", "network_id": network_ref["id"], "instance_id": instance_id}
vif_ref = db.virtual_interface_create(ctxt, vif)
fixed_ip = {
"address": ip,
"network_id": network_ref["id"],
"virtual_interface_id": vif_ref["id"],
"allocated": True,
"instance_id": instance_id,
}
db.fixed_ip_create(ctxt, fixed_ip)
fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
db.floating_ip_create(ctxt, {"address": flo_addr, "fixed_ip_id": fix_ref["id"]})
开发者ID:hiteshwadekar,项目名称:nova,代码行数:16,代码来源:test_db_api.py
示例6: create
def create(self, context):
updates = self.obj_get_changes()
if "id" in updates:
raise exception.ObjectActionError(action="create", reason="already created")
if "address" in updates:
updates["address"] = str(updates["address"])
db_fixedip = db.fixed_ip_create(context, updates)
self._from_db_object(context, self, db_fixedip)
开发者ID:wputra,项目名称:MOS-centos,代码行数:8,代码来源:fixed_ip.py
示例7: test_network_delete_safe
def test_network_delete_safe(self):
ctxt = context.get_admin_context()
values = {'host': 'localhost', 'project_id': 'project1'}
network = db.network_create_safe(ctxt, values)
db_network = db.network_get(ctxt, network.id)
values = {'network_id': network['id'], 'address': 'fake1'}
address1 = db.fixed_ip_create(ctxt, values)
values = {'network_id': network['id'],
'address': 'fake2',
'allocated': True}
address2 = db.fixed_ip_create(ctxt, values)
self.assertRaises(exception.NetworkInUse,
db.network_delete_safe, ctxt, network['id'])
db.fixed_ip_update(ctxt, address2, {'allocated': False})
network = db.network_delete_safe(ctxt, network['id'])
ctxt = ctxt.elevated(read_deleted='yes')
fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
self.assertTrue(fixed_ip['deleted'])
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:18,代码来源:test_db_api.py
示例8: create
def create(self, context):
updates = self.obj_get_changes()
if 'id' in updates:
raise exception.ObjectActionError(action='create',
reason='already created')
if 'address' in updates:
updates['address'] = str(updates['address'])
db_fixedip = db.fixed_ip_create(context, updates)
self._from_db_object(context, self, db_fixedip)
开发者ID:csdhome,项目名称:nova,代码行数:9,代码来源:fixed_ip.py
示例9: _setup_networking
def _setup_networking(instance_id, ip='1.2.3.4', flo_addr='1.2.1.2'):
ctxt = context.get_admin_context()
network_ref = db.project_get_networks(ctxt,
'fake',
associate=True)[0]
vif = {'address': '56:12:12:12:12:12',
'network_id': network_ref['id'],
'instance_id': instance_id}
vif_ref = db.virtual_interface_create(ctxt, vif)
fixed_ip = {'address': ip,
'network_id': network_ref['id'],
'virtual_interface_id': vif_ref['id'],
'allocated': True,
'instance_id': instance_id}
db.fixed_ip_create(ctxt, fixed_ip)
fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
db.floating_ip_create(ctxt, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
开发者ID:matiu2,项目名称:nova,代码行数:19,代码来源:test_db_api.py
示例10: setUp
def setUp(self):
super(FixedIpCommandsTestCase, self).setUp()
cidr = '10.0.0.0/24'
net = netaddr.IPNetwork(cidr)
net_info = {'bridge': 'fakebr',
'bridge_interface': 'fakeeth',
'dns': FLAGS.flat_network_dns,
'cidr': cidr,
'netmask': str(net.netmask),
'gateway': str(net[1]),
'broadcast': str(net.broadcast),
'dhcp_start': str(net[2])}
self.network = db.network_create_safe(context.get_admin_context(),
net_info)
num_ips = len(net)
for index in range(num_ips):
address = str(net[index])
reserved = (index == 1 or index == 2)
db.fixed_ip_create(context.get_admin_context(),
{'network_id': self.network['id'],
'address': address,
'reserved': reserved})
self.commands = nova_manage.FixedIpCommands()
开发者ID:lovocas,项目名称:nova,代码行数:23,代码来源:test_nova_manage.py
示例11: _timeout_test
def _timeout_test(self, ctxt, timeout, multi_host):
values = {"host": "foo"}
instance = db.instance_create(ctxt, values)
values = {"multi_host": multi_host, "host": "bar"}
net = db.network_create_safe(ctxt, values)
old = time = timeout - datetime.timedelta(seconds=5)
new = time = timeout + datetime.timedelta(seconds=5)
# should deallocate
values = {"allocated": False, "instance_id": instance["id"], "network_id": net["id"], "updated_at": old}
db.fixed_ip_create(ctxt, values)
# still allocated
values = {"allocated": True, "instance_id": instance["id"], "network_id": net["id"], "updated_at": old}
db.fixed_ip_create(ctxt, values)
# wrong network
values = {"allocated": False, "instance_id": instance["id"], "network_id": None, "updated_at": old}
db.fixed_ip_create(ctxt, values)
# too new
values = {"allocated": False, "instance_id": instance["id"], "network_id": None, "updated_at": new}
db.fixed_ip_create(ctxt, values)
开发者ID:hiteshwadekar,项目名称:nova,代码行数:19,代码来源:test_db_api.py
示例12: test_post_live_migration_working_correctly
def test_post_live_migration_working_correctly(self):
"""Confirm post_live_migration() works as expected correctly."""
dest = 'desthost'
flo_addr = '1.2.1.2'
# Preparing datas
c = context.get_admin_context()
instance_id = self._create_instance()
i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['id'], {'state_description': 'migrating',
'state': power_state.PAUSED})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
'instance_id': instance_id})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
# reload is necessary before setting mocks
i_ref = db.instance_get(c, instance_id)
# Preparing mocks
self.mox.StubOutWithMock(self.compute.volume_manager,
'remove_compute_volume')
for v in i_ref['volumes']:
self.compute.volume_manager.remove_compute_volume(c, v['id'])
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, [])
# executing
self.mox.ReplayAll()
ret = self.compute.post_live_migration(c, i_ref, dest)
# make sure every data is rewritten to dest
i_ref = db.instance_get(c, i_ref['id'])
c1 = (i_ref['host'] == dest)
flo_refs = db.floating_ip_get_all_by_host(c, dest)
c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr)
# post operaton
self.assertTrue(c1 and c2)
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
开发者ID:cp16net,项目名称:reddwarf,代码行数:43,代码来源:test_compute.py
示例13: _check_xml_and_container
def _check_xml_and_container(self, instance):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
host = self.network.get_network_host(user_context.elevated())
network_ref = db.project_get_network(context.get_admin_context(),
self.project.id)
fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
db.fixed_ip_update(ctxt, self.test_ip,
{'allocated': True,
'instance_id': instance_ref['id']})
self.flags(libvirt_type='lxc')
conn = libvirt_conn.LibvirtConnection(True)
uri = conn.get_uri()
self.assertEquals(uri, 'lxc:///')
xml = conn.to_xml(instance_ref)
tree = xml_to_tree(xml)
check = [
(lambda t: t.find('.').get('type'), 'lxc'),
(lambda t: t.find('./os/type').text, 'exe'),
(lambda t: t.find('./devices/filesystem/target').get('dir'), '/')]
for i, (check, expected_result) in enumerate(check):
self.assertEqual(check(tree),
expected_result,
'%s failed common check %d' % (xml, i))
target = tree.find('./devices/filesystem/source').get('dir')
self.assertTrue(len(target) > 0)
开发者ID:superstack,项目名称:nova,代码行数:38,代码来源:test_virt.py
示例14: create_fixed_ip
def create_fixed_ip(self, **params):
default_params = {'address': '192.168.0.1'}
default_params.update(params)
return db.fixed_ip_create(self.ctxt, default_params)
开发者ID:matiu2,项目名称:nova,代码行数:4,代码来源:test_db_api.py
示例15: test_static_filters
def test_static_filters(self):
instance_ref = db.instance_create(self.context,
{'user_id': 'fake',
'project_id': 'fake',
'mac_address': '56:12:12:12:12:12'})
ip = '10.11.12.13'
network_ref = db.project_get_network(self.context,
'fake')
fixed_ip = {'address': ip,
'network_id': network_ref['id']}
admin_ctxt = context.get_admin_context()
db.fixed_ip_create(admin_ctxt, fixed_ip)
db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
'instance_id': instance_ref['id']})
secgroup = db.security_group_create(admin_ctxt,
{'user_id': 'fake',
'project_id': 'fake',
'name': 'testgroup',
'description': 'test group'})
db.security_group_rule_create(admin_ctxt,
{'parent_group_id': secgroup['id'],
'protocol': 'icmp',
'from_port': -1,
'to_port': -1,
'cidr': '192.168.11.0/24'})
db.security_group_rule_create(admin_ctxt,
{'parent_group_id': secgroup['id'],
'protocol': 'icmp',
'from_port': 8,
'to_port': -1,
'cidr': '192.168.11.0/24'})
db.security_group_rule_create(admin_ctxt,
{'parent_group_id': secgroup['id'],
'protocol': 'tcp',
'from_port': 80,
'to_port': 81,
'cidr': '192.168.10.0/24'})
db.instance_add_security_group(admin_ctxt, instance_ref['id'],
secgroup['id'])
instance_ref = db.instance_get(admin_ctxt, instance_ref['id'])
# self.fw.add_instance(instance_ref)
def fake_iptables_execute(cmd, process_input=None):
if cmd == 'sudo ip6tables-save -t filter':
return '\n'.join(self.in6_rules), None
if cmd == 'sudo iptables-save -t filter':
return '\n'.join(self.in_rules), None
if cmd == 'sudo iptables-restore':
self.out_rules = process_input.split('\n')
return '', ''
if cmd == 'sudo ip6tables-restore':
self.out6_rules = process_input.split('\n')
return '', ''
self.fw.execute = fake_iptables_execute
self.fw.prepare_instance_filter(instance_ref)
self.fw.apply_instance_filter(instance_ref)
in_rules = filter(lambda l: not l.startswith('#'), self.in_rules)
for rule in in_rules:
if not 'nova' in rule:
self.assertTrue(rule in self.out_rules,
'Rule went missing: %s' % rule)
instance_chain = None
for rule in self.out_rules:
# This is pretty crude, but it'll do for now
if '-d 10.11.12.13 -j' in rule:
instance_chain = rule.split(' ')[-1]
break
self.assertTrue(instance_chain, "The instance chain wasn't added")
security_group_chain = None
for rule in self.out_rules:
# This is pretty crude, but it'll do for now
if '-A %s -j' % instance_chain in rule:
security_group_chain = rule.split(' ')[-1]
break
self.assertTrue(security_group_chain,
"The security group chain wasn't added")
self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT' % \
security_group_chain in self.out_rules,
"ICMP acceptance rule wasn't added")
self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type '
'8 -j ACCEPT' % security_group_chain in self.out_rules,
"ICMP Echo Request acceptance rule wasn't added")
self.assertTrue('-A %s -p tcp -s 192.168.10.0/24 -m multiport '
'--dports 80:81 -j ACCEPT' % security_group_chain \
in self.out_rules,
#.........这里部分代码省略.........
开发者ID:anotherjesse,项目名称:nova,代码行数:101,代码来源:test_virt.py
示例16: test_static_filters
def test_static_filters(self):
instance_ref = db.instance_create(
self.context, {"user_id": "fake", "project_id": "fake", "mac_address": "56:12:12:12:12:12"}
)
ip = "10.11.12.13"
network_ref = db.project_get_network(self.context, "fake")
fixed_ip = {"address": ip, "network_id": network_ref["id"]}
admin_ctxt = context.get_admin_context()
db.fixed_ip_create(admin_ctxt, fixed_ip)
db.fixed_ip_update(admin_ctxt, ip, {"allocated": True, "instance_id": instance_ref["id"]})
secgroup = db.security_group_create(
admin_ctxt, {"user_id": "fake", "project_id": "fake", "name": "testgroup", "description": "test group"}
)
db.security_group_rule_create(
admin_ctxt,
{
"parent_group_id": secgroup["id"],
"protocol": "icmp",
"from_port": -1,
"to_port": -1,
"cidr": "192.168.11.0/24",
},
)
db.security_group_rule_create(
admin_ctxt,
{
"parent_group_id": secgroup["id"],
"protocol": "icmp",
"from_port": 8,
"to_port": -1,
"cidr": "192.168.11.0/24",
},
)
db.security_group_rule_create(
admin_ctxt,
{
"parent_group_id": secgroup["id"],
"protocol": "tcp",
"from_port": 80,
"to_port": 81,
"cidr": "192.168.10.0/24",
},
)
db.instance_add_security_group(admin_ctxt, instance_ref["id"], secgroup["id"])
instance_ref = db.instance_get(admin_ctxt, instance_ref["id"])
# self.fw.add_instance(instance_ref)
def fake_iptables_execute(cmd, process_input=None):
if cmd == "sudo ip6tables-save -t filter":
return "\n".join(self.in6_rules), None
if cmd == "sudo iptables-save -t filter":
return "\n".join(self.in_rules), None
if cmd == "sudo iptables-restore":
self.out_rules = process_input.split("\n")
return "", ""
if cmd == "sudo ip6tables-restore":
self.out6_rules = process_input.split("\n")
return "", ""
self.fw.execute = fake_iptables_execute
self.fw.prepare_instance_filter(instance_ref)
self.fw.apply_instance_filter(instance_ref)
in_rules = filter(lambda l: not l.startswith("#"), self.in_rules)
for rule in in_rules:
if not "nova" in rule:
self.assertTrue(rule in self.out_rules, "Rule went missing: %s" % rule)
instance_chain = None
for rule in self.out_rules:
# This is pretty crude, but it'll do for now
if "-d 10.11.12.13 -j" in rule:
instance_chain = rule.split(" ")[-1]
break
self.assertTrue(instance_chain, "The instance chain wasn't added")
security_group_chain = None
for rule in self.out_rules:
# This is pretty crude, but it'll do for now
if "-A %s -j" % instance_chain in rule:
security_group_chain = rule.split(" ")[-1]
break
self.assertTrue(security_group_chain, "The security group chain wasn't added")
self.assertTrue(
"-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT" % security_group_chain in self.out_rules,
"ICMP acceptance rule wasn't added",
)
self.assertTrue(
"-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type "
#.........这里部分代码省略.........
开发者ID:yosh,项目名称:nova,代码行数:101,代码来源:test_virt.py
示例17: test_creates_base_rule_first
def test_creates_base_rule_first(self):
# These come pre-defined by libvirt
self.defined_filters = ["no-mac-spoofing", "no-ip-spoofing", "no-arp-spoofing", "allow-dhcp-server"]
self.recursive_depends = {}
for f in self.defined_filters:
self.recursive_depends[f] = []
def _filterDefineXMLMock(xml):
dom = xml_to_dom(xml)
name = dom.firstChild.getAttribute("name")
self.recursive_depends[name] = []
for f in dom.getElementsByTagName("filterref"):
ref = f.getAttribute("filter")
self.assertTrue(
ref in self.defined_filters, ("%s referenced filter that does " + "not yet exist: %s") % (name, ref)
)
dependencies = [ref] + self.recursive_depends[ref]
self.recursive_depends[name] += dependencies
self.defined_filters.append(name)
return True
self.fake_libvirt_connection.nwfilterDefineXML = _filterDefineXMLMock
instance_ref = db.instance_create(self.context, {"user_id": "fake", "project_id": "fake"})
inst_id = instance_ref["id"]
ip = "10.11.12.13"
network_ref = db.project_get_network(self.context, "fake")
fixed_ip = {"address": ip, "network_id": network_ref["id"]}
admin_ctxt = context.get_admin_context()
db.fixed_ip_create(admin_ctxt, fixed_ip)
db.fixed_ip_update(admin_ctxt, ip, {"allocated": True, "instance_id": instance_ref["id"]})
def _ensure_all_called():
instance_filter = "nova-instance-%s" % instance_ref["name"]
secgroup_filter = "nova-secgroup-%s" % self.security_group["id"]
for required in [
secgroup_filter,
"allow-dhcp-server",
"no-arp-spoofing",
"no-ip-spoofing",
"no-mac-spoofing",
]:
self.assertTrue(
required in self.recursive_depends[instance_filter],
"Instance's filter does not include %s" % required,
)
self.security_group = self.setup_and_return_security_group()
db.instance_add_security_group(self.context, inst_id, self.security_group.id)
instance = db.instance_get(self.context, inst_id)
self.fw.setup_basic_filtering(instance)
self.fw.prepare_instance_filter(instance)
self.fw.apply_instance_filter(instance)
_ensure_all_called()
self.teardown_security_group()
db.instance_destroy(admin_ctxt, instance_ref["id"])
开发者ID:yosh,项目名称:nova,代码行数:64,代码来源:test_virt.py
示例18: _check_xml_and_uri
def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel, rescue=False):
user_context = context.RequestContext(project=self.project, user=self.user)
instance_ref = db.instance_create(user_context, instance)
host = self.network.get_network_host(user_context.elevated())
network_ref = db.project_get_network(context.get_admin_context(), self.project.id)
fixed_ip = {"address": self.test_ip, "network_id": network_ref["id"]}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
db.fixed_ip_update(ctxt, self.test_ip, {"allocated": True, "instance_id": instance_ref["id"]})
type_uri_map = {
"qemu": (
"qemu:///system",
[
(lambda t: t.find(".").get("type"), "qemu"),
(lambda t: t.find("./os/type").text, "hvm"),
(lambda t: t.find("./devices/emulator"), None),
],
),
"kvm": (
"qemu:///system",
[
(lambda t: t.find(".").get("type"), "kvm"),
(lambda t: t.find("./os/type").text, "hvm"),
(lambda t: t.find("./devices/emulator"), None),
],
),
"uml": (
"uml:///system",
[(lambda t: t.find(".").get("type"), "uml"), (lambda t: t.find("./os/type").text, "uml")],
),
"xen": (
"xen:///",
[(lambda t: t.find(".").get("type"), "xen"), (lambda t: t.find("./os/type").text, "linux")],
),
}
for hypervisor_type in ["qemu", "kvm", "xen"]:
check_list = type_uri_map[hypervisor_type][1]
if rescue:
check = (lambda t: t.find("./os/kernel").text.split("/")[1], "kernel.rescue")
check_list.append(check)
check = (lambda t: t.find("./os/initrd").text.split("/")[1], "ramdisk.rescue")
check_list.append(check)
else:
if expect_kernel:
check = (lambda t: t.find("./os/kernel").text.split("/")[1], "kernel")
else:
check = (lambda t: t.find("./os/kernel"), None)
check_list.append(check)
if expect_ramdisk:
check = (lambda t: t.find("./os/initrd").text.split("/")[1], "ramdisk")
else:
check = (lambda t: t.find("./os/initrd"), None)
check_list.append(check)
common_checks = [
(lambda t: t.find(".").tag, "domain"),
(lambda t: t.find("./devices/interface/filterref/parameter").get("name"), "IP"),
(lambda t: t.find("./devices/interface/filterref/parameter").get("value"), "10.11.12.13"),
(lambda t: t.findall("./devices/interface/filterref/parameter")[1].get("name"), "DHCPSERVER"),
(lambda t: t.findall("./devices/interface/filterref/parameter")[1].get("value"), "10.0.0.1"),
(lambda t: t.find("./devices/serial/source").get("path").split("/")[1], "console.log"),
(lambda t: t.find("./memory").text, "2097152"),
]
if rescue:
common_checks += [
(lambda t: t.findall("./devices/disk/source")[0].get("file").split("/")[1], "disk.rescue"),
(lambda t: t.findall("./devices/disk/source")[1].get("file").split("/")[1], "disk"),
]
else:
common_checks += [(lambda t: t.findall("./devices/disk/source")[0].get("file").split("/")[1], "disk")]
common_checks += [(lambda t: t.findall("./devices/disk/source")[1].get("file").split("/")[1], "disk.local")]
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
uri = conn.get_uri()
self.assertEquals(uri, expected_uri)
xml = conn.to_xml(instance_ref, rescue)
tree = xml_to_tree(xml)
for i, (check, expected_result) in enumerate(checks):
self.assertEqual(check(tree), expected_result, "%s failed check %d" % (xml, i))
for i, (check, expected_result) in enumerate(common_checks):
self.assertEqual(check(tree), expected_result, "%s failed common check %d" % (xml, i))
# This test is supposed to make sure we don't override a specifically
# set uri
#
# Deliberately not just assigning this string to FLAGS.libvirt_uri and
# checking against that later on. This way we make sure the
# implementation doesn't fiddle around with the FLAGS.
testuri = "something completely different"
#.........这里部分代码省略.........
开发者ID:yosh,项目名称:nova,代码行数:101,代码来源:test_virt.py
示例19: test_creates_base_rule_first
def test_creates_base_rule_first(self):
# These come pre-defined by libvirt
self.defined_filters = ['no-mac-spoofing',
'no-ip-spoofing',
'no-arp-spoofing',
'allow-dhcp-server']
self.recursive_depends = {}
for f in self.defined_filters:
self.recursive_depends[f] = []
def _filterDefineXMLMock(xml):
dom = xml_to_dom(xml)
name = dom.firstChild.getAttribute('name')
self.recursive_depends[name] = []
for f in dom.getElementsByTagName('filterref'):
ref = f.getAttribute('filter')
self.assertTrue(ref in self.defined_filters,
('%s referenced filter that does ' +
'not yet exist: %s') % (name, ref))
dependencies = [ref] + self.recursive_depends[ref]
self.recursive_depends[name] += dependencies
self.defined_filters.append(name)
return True
self.fake_libvirt_connection.nwfilterDefineXML = _filterDefineXMLMock
instance_ref = self._create_instance()
inst_id = instance_ref['id']
ip = '10.11.12.13'
network_ref = db.project_get_network(self.context, 'fake')
fixed_ip = {'address': ip, 'network_id': network_ref['id']}
admin_ctxt = context.get_admin_context()
db.fixed_ip_create(admin_ctxt, fixed_ip)
db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
'instance_id': inst_id})
def _ensure_all_called():
instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
'00A0C914C829')
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
'no-arp-spoofing', 'no-ip-spoofing',
'no-mac-spoofing']:
self.assertTrue(required in
self.recursive_depends[instance_filter],
"Instance's filter does not include %s" %
required)
self.security_group = self.setup_and_return_security_group()
db.instance_add_security_group(self.context, inst_id,
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
self.fw.setup_basic_filtering(instance)
self.fw.prepare_instance_filter(instance)
self.fw.apply_instance_filter(instance)
_ensure_all_called()
self.teardown_security_group()
db.instance_destroy(admin_ctxt, instance_ref['id'])
开发者ID:superstack,项目名称:nova,代码行数:65,代码来源:test_virt.py
示例20: test_static_filters
def test_static_filters(self):
instance_ref = self._create_instance_ref()
ip = '10.11.12.13'
network_ref = db.project_get_network(self.context,
'fake')
fixed_ip = {'address': ip,
'network_id': network_ref['id']}
admin_ctxt = context.get_admin_context()
db.fixed_ip_create(admin_ctxt, fixed_ip)
db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
'instance_id': instance_ref['id']})
secgroup = db.security_group_create(admin_ctxt,
{'user_id': 'fake',
'project_id': 'fake',
'name': 'testgroup',
'description': 'test group'})
db.security_group_rule_create(admin_ctxt,
{'parent_group_id': secgroup['id'],
'protocol': 'icmp',
'from_port': -1,
|
请发表评论