• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python db.fixed_ip_create函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nova.db.fixed_ip_create函数的典型用法代码示例。如果您正苦于以下问题:Python fixed_ip_create函数的具体用法?Python fixed_ip_create怎么用?Python fixed_ip_create使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了fixed_ip_create函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _timeout_test

 def _timeout_test(self, ctxt, timeout, multi_host):
     values = {'host': 'foo'}
     instance = db.instance_create(ctxt, values)
     values = {'multi_host': multi_host, 'host': 'bar'}
     net = db.network_create_safe(ctxt, values)
     old = time = timeout - datetime.timedelta(seconds=5)
     new = time = timeout + datetime.timedelta(seconds=5)
     # should deallocate
     values = {'allocated': False,
               'instance_id': instance['id'],
               'network_id': net['id'],
               'updated_at': old}
     db.fixed_ip_create(ctxt, values)
     # still allocated
     values = {'allocated': True,
               'instance_id': instance['id'],
               'network_id': net['id'],
               'updated_at': old}
     db.fixed_ip_create(ctxt, values)
     # wrong network
     values = {'allocated': False,
               'instance_id': instance['id'],
               'network_id': None,
               'updated_at': old}
     db.fixed_ip_create(ctxt, values)
     # too new
     values = {'allocated': False,
               'instance_id': instance['id'],
               'network_id': None,
               'updated_at': new}
     db.fixed_ip_create(ctxt, values)
开发者ID:matiu2,项目名称:nova,代码行数:31,代码来源:test_db_api.py


示例2: test_network_delete_safe

 def test_network_delete_safe(self):
     ctxt = context.get_admin_context()
     values = {"host": "localhost", "project_id": "project1"}
     network = db.network_create_safe(ctxt, values)
     db_network = db.network_get(ctxt, network.id)
     values = {"network_id": network["id"], "address": "fake1"}
     address1 = db.fixed_ip_create(ctxt, values)
     values = {"network_id": network["id"], "address": "fake2", "allocated": True}
     address2 = db.fixed_ip_create(ctxt, values)
     self.assertRaises(exception.NetworkInUse, db.network_delete_safe, ctxt, network["id"])
     db.fixed_ip_update(ctxt, address2, {"allocated": False})
     network = db.network_delete_safe(ctxt, network["id"])
     ctxt = ctxt.elevated(read_deleted="yes")
     fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
     self.assertTrue(fixed_ip["deleted"])
开发者ID:hiteshwadekar,项目名称:nova,代码行数:15,代码来源:test_db_api.py


示例3: test_network_get_associated_fixed_ips

 def test_network_get_associated_fixed_ips(self):
     ctxt = context.get_admin_context()
     values = {"host": "foo", "hostname": "myname"}
     instance = db.instance_create(ctxt, values)
     values = {"address": "bar", "instance_id": instance["id"]}
     vif = db.virtual_interface_create(ctxt, values)
     values = {
         "address": "baz",
         "network_id": 1,
         "allocated": True,
         "instance_id": instance["id"],
         "virtual_interface_id": vif["id"],
     }
     fixed_address = db.fixed_ip_create(ctxt, values)
     data = db.network_get_associated_fixed_ips(ctxt, 1)
     self.assertEqual(len(data), 1)
     record = data[0]
     self.assertEqual(record["address"], fixed_address)
     self.assertEqual(record["instance_id"], instance["id"])
     self.assertEqual(record["network_id"], 1)
     self.assertEqual(record["instance_created"], instance["created_at"])
     self.assertEqual(record["instance_updated"], instance["updated_at"])
     self.assertEqual(record["instance_hostname"], instance["hostname"])
     self.assertEqual(record["vif_id"], vif["id"])
     self.assertEqual(record["vif_address"], vif["address"])
     data = db.network_get_associated_fixed_ips(ctxt, 1, "nothing")
     self.assertEqual(len(data), 0)
开发者ID:hiteshwadekar,项目名称:nova,代码行数:27,代码来源:test_db_api.py


示例4: test_network_get_associated_fixed_ips

 def test_network_get_associated_fixed_ips(self):
     ctxt = context.get_admin_context()
     values = {'host': 'foo', 'hostname': 'myname'}
     instance = db.instance_create(ctxt, values)
     values = {'address': 'bar', 'instance_id': instance['id']}
     vif = db.virtual_interface_create(ctxt, values)
     values = {'address': 'baz',
               'network_id': 1,
               'allocated': True,
               'instance_id': instance['id'],
               'virtual_interface_id': vif['id']}
     fixed_address = db.fixed_ip_create(ctxt, values)
     data = db.network_get_associated_fixed_ips(ctxt, 1)
     self.assertEqual(len(data), 1)
     record = data[0]
     self.assertEqual(record['address'], fixed_address)
     self.assertEqual(record['instance_id'], instance['id'])
     self.assertEqual(record['network_id'], 1)
     self.assertEqual(record['instance_created'], instance['created_at'])
     self.assertEqual(record['instance_updated'], instance['updated_at'])
     self.assertEqual(record['instance_hostname'], instance['hostname'])
     self.assertEqual(record['vif_id'], vif['id'])
     self.assertEqual(record['vif_address'], vif['address'])
     data = db.network_get_associated_fixed_ips(ctxt, 1, 'nothing')
     self.assertEqual(len(data), 0)
开发者ID:matiu2,项目名称:nova,代码行数:25,代码来源:test_db_api.py


示例5: _setup_networking

def _setup_networking(instance_id, ip="1.2.3.4", flo_addr="1.2.1.2"):
    ctxt = context.get_admin_context()
    network_ref = db.project_get_networks(ctxt, "fake", associate=True)[0]
    vif = {"address": "56:12:12:12:12:12", "network_id": network_ref["id"], "instance_id": instance_id}
    vif_ref = db.virtual_interface_create(ctxt, vif)

    fixed_ip = {
        "address": ip,
        "network_id": network_ref["id"],
        "virtual_interface_id": vif_ref["id"],
        "allocated": True,
        "instance_id": instance_id,
    }
    db.fixed_ip_create(ctxt, fixed_ip)
    fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
    db.floating_ip_create(ctxt, {"address": flo_addr, "fixed_ip_id": fix_ref["id"]})
开发者ID:hiteshwadekar,项目名称:nova,代码行数:16,代码来源:test_db_api.py


示例6: create

 def create(self, context):
     updates = self.obj_get_changes()
     if "id" in updates:
         raise exception.ObjectActionError(action="create", reason="already created")
     if "address" in updates:
         updates["address"] = str(updates["address"])
     db_fixedip = db.fixed_ip_create(context, updates)
     self._from_db_object(context, self, db_fixedip)
开发者ID:wputra,项目名称:MOS-centos,代码行数:8,代码来源:fixed_ip.py


示例7: test_network_delete_safe

 def test_network_delete_safe(self):
     ctxt = context.get_admin_context()
     values = {'host': 'localhost', 'project_id': 'project1'}
     network = db.network_create_safe(ctxt, values)
     db_network = db.network_get(ctxt, network.id)
     values = {'network_id': network['id'], 'address': 'fake1'}
     address1 = db.fixed_ip_create(ctxt, values)
     values = {'network_id': network['id'],
               'address': 'fake2',
               'allocated': True}
     address2 = db.fixed_ip_create(ctxt, values)
     self.assertRaises(exception.NetworkInUse,
                       db.network_delete_safe, ctxt, network['id'])
     db.fixed_ip_update(ctxt, address2, {'allocated': False})
     network = db.network_delete_safe(ctxt, network['id'])
     ctxt = ctxt.elevated(read_deleted='yes')
     fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
     self.assertTrue(fixed_ip['deleted'])
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:18,代码来源:test_db_api.py


示例8: create

 def create(self, context):
     updates = self.obj_get_changes()
     if 'id' in updates:
         raise exception.ObjectActionError(action='create',
                                           reason='already created')
     if 'address' in updates:
         updates['address'] = str(updates['address'])
     db_fixedip = db.fixed_ip_create(context, updates)
     self._from_db_object(context, self, db_fixedip)
开发者ID:csdhome,项目名称:nova,代码行数:9,代码来源:fixed_ip.py


示例9: _setup_networking

def _setup_networking(instance_id, ip='1.2.3.4', flo_addr='1.2.1.2'):
    ctxt = context.get_admin_context()
    network_ref = db.project_get_networks(ctxt,
                                           'fake',
                                           associate=True)[0]
    vif = {'address': '56:12:12:12:12:12',
           'network_id': network_ref['id'],
           'instance_id': instance_id}
    vif_ref = db.virtual_interface_create(ctxt, vif)

    fixed_ip = {'address': ip,
                'network_id': network_ref['id'],
                'virtual_interface_id': vif_ref['id'],
                'allocated': True,
                'instance_id': instance_id}
    db.fixed_ip_create(ctxt, fixed_ip)
    fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
    db.floating_ip_create(ctxt, {'address': flo_addr,
                                 'fixed_ip_id': fix_ref['id']})
开发者ID:matiu2,项目名称:nova,代码行数:19,代码来源:test_db_api.py


示例10: setUp

 def setUp(self):
     super(FixedIpCommandsTestCase, self).setUp()
     cidr = '10.0.0.0/24'
     net = netaddr.IPNetwork(cidr)
     net_info = {'bridge': 'fakebr',
            'bridge_interface': 'fakeeth',
            'dns': FLAGS.flat_network_dns,
            'cidr': cidr,
            'netmask': str(net.netmask),
            'gateway': str(net[1]),
            'broadcast': str(net.broadcast),
            'dhcp_start': str(net[2])}
     self.network = db.network_create_safe(context.get_admin_context(),
                                           net_info)
     num_ips = len(net)
     for index in range(num_ips):
         address = str(net[index])
         reserved = (index == 1 or index == 2)
         db.fixed_ip_create(context.get_admin_context(),
                            {'network_id': self.network['id'],
                             'address': address,
                             'reserved': reserved})
     self.commands = nova_manage.FixedIpCommands()
开发者ID:lovocas,项目名称:nova,代码行数:23,代码来源:test_nova_manage.py


示例11: _timeout_test

 def _timeout_test(self, ctxt, timeout, multi_host):
     values = {"host": "foo"}
     instance = db.instance_create(ctxt, values)
     values = {"multi_host": multi_host, "host": "bar"}
     net = db.network_create_safe(ctxt, values)
     old = time = timeout - datetime.timedelta(seconds=5)
     new = time = timeout + datetime.timedelta(seconds=5)
     # should deallocate
     values = {"allocated": False, "instance_id": instance["id"], "network_id": net["id"], "updated_at": old}
     db.fixed_ip_create(ctxt, values)
     # still allocated
     values = {"allocated": True, "instance_id": instance["id"], "network_id": net["id"], "updated_at": old}
     db.fixed_ip_create(ctxt, values)
     # wrong network
     values = {"allocated": False, "instance_id": instance["id"], "network_id": None, "updated_at": old}
     db.fixed_ip_create(ctxt, values)
     # too new
     values = {"allocated": False, "instance_id": instance["id"], "network_id": None, "updated_at": new}
     db.fixed_ip_create(ctxt, values)
开发者ID:hiteshwadekar,项目名称:nova,代码行数:19,代码来源:test_db_api.py


示例12: test_post_live_migration_working_correctly

    def test_post_live_migration_working_correctly(self):
        """Confirm post_live_migration() works as expected correctly."""
        dest = 'desthost'
        flo_addr = '1.2.1.2'

        # Preparing datas
        c = context.get_admin_context()
        instance_id = self._create_instance()
        i_ref = db.instance_get(c, instance_id)
        db.instance_update(c, i_ref['id'], {'state_description': 'migrating',
                                            'state': power_state.PAUSED})
        v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
        fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
                                          'instance_id': instance_id})
        fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
        flo_ref = db.floating_ip_create(c, {'address': flo_addr,
                                        'fixed_ip_id': fix_ref['id']})
        # reload is necessary before setting mocks
        i_ref = db.instance_get(c, instance_id)

        # Preparing mocks
        self.mox.StubOutWithMock(self.compute.volume_manager,
                                 'remove_compute_volume')
        for v in i_ref['volumes']:
            self.compute.volume_manager.remove_compute_volume(c, v['id'])
        self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
        self.compute.driver.unfilter_instance(i_ref, [])

        # executing
        self.mox.ReplayAll()
        ret = self.compute.post_live_migration(c, i_ref, dest)

        # make sure every data is rewritten to dest
        i_ref = db.instance_get(c, i_ref['id'])
        c1 = (i_ref['host'] == dest)
        flo_refs = db.floating_ip_get_all_by_host(c, dest)
        c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr)

        # post operaton
        self.assertTrue(c1 and c2)
        db.instance_destroy(c, instance_id)
        db.volume_destroy(c, v_ref['id'])
        db.floating_ip_destroy(c, flo_addr)
开发者ID:cp16net,项目名称:reddwarf,代码行数:43,代码来源:test_compute.py


示例13: _check_xml_and_container

    def _check_xml_and_container(self, instance):
        user_context = context.RequestContext(project=self.project,
                                              user=self.user)
        instance_ref = db.instance_create(user_context, instance)
        host = self.network.get_network_host(user_context.elevated())
        network_ref = db.project_get_network(context.get_admin_context(),
                                             self.project.id)

        fixed_ip = {'address': self.test_ip,
                    'network_id': network_ref['id']}

        ctxt = context.get_admin_context()
        fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
        db.fixed_ip_update(ctxt, self.test_ip,
                                 {'allocated': True,
                                  'instance_id': instance_ref['id']})

        self.flags(libvirt_type='lxc')
        conn = libvirt_conn.LibvirtConnection(True)

        uri = conn.get_uri()
        self.assertEquals(uri, 'lxc:///')

        xml = conn.to_xml(instance_ref)
        tree = xml_to_tree(xml)

        check = [
        (lambda t: t.find('.').get('type'), 'lxc'),
        (lambda t: t.find('./os/type').text, 'exe'),
        (lambda t: t.find('./devices/filesystem/target').get('dir'), '/')]

        for i, (check, expected_result) in enumerate(check):
            self.assertEqual(check(tree),
                             expected_result,
                             '%s failed common check %d' % (xml, i))

        target = tree.find('./devices/filesystem/source').get('dir')
        self.assertTrue(len(target) > 0)
开发者ID:superstack,项目名称:nova,代码行数:38,代码来源:test_virt.py


示例14: create_fixed_ip

 def create_fixed_ip(self, **params):
     default_params = {'address': '192.168.0.1'}
     default_params.update(params)
     return db.fixed_ip_create(self.ctxt, default_params)
开发者ID:matiu2,项目名称:nova,代码行数:4,代码来源:test_db_api.py


示例15: test_static_filters

    def test_static_filters(self):
        instance_ref = db.instance_create(self.context,
                                          {'user_id': 'fake',
                                          'project_id': 'fake',
                                          'mac_address': '56:12:12:12:12:12'})
        ip = '10.11.12.13'

        network_ref = db.project_get_network(self.context,
                                             'fake')

        fixed_ip = {'address': ip,
                    'network_id': network_ref['id']}

        admin_ctxt = context.get_admin_context()
        db.fixed_ip_create(admin_ctxt, fixed_ip)
        db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
                                            'instance_id': instance_ref['id']})

        secgroup = db.security_group_create(admin_ctxt,
                                            {'user_id': 'fake',
                                             'project_id': 'fake',
                                             'name': 'testgroup',
                                             'description': 'test group'})

        db.security_group_rule_create(admin_ctxt,
                                      {'parent_group_id': secgroup['id'],
                                       'protocol': 'icmp',
                                       'from_port': -1,
                                       'to_port': -1,
                                       'cidr': '192.168.11.0/24'})

        db.security_group_rule_create(admin_ctxt,
                                      {'parent_group_id': secgroup['id'],
                                       'protocol': 'icmp',
                                       'from_port': 8,
                                       'to_port': -1,
                                       'cidr': '192.168.11.0/24'})

        db.security_group_rule_create(admin_ctxt,
                                      {'parent_group_id': secgroup['id'],
                                       'protocol': 'tcp',
                                       'from_port': 80,
                                       'to_port': 81,
                                       'cidr': '192.168.10.0/24'})

        db.instance_add_security_group(admin_ctxt, instance_ref['id'],
                                       secgroup['id'])
        instance_ref = db.instance_get(admin_ctxt, instance_ref['id'])

#        self.fw.add_instance(instance_ref)
        def fake_iptables_execute(cmd, process_input=None):
            if cmd == 'sudo ip6tables-save -t filter':
                return '\n'.join(self.in6_rules), None
            if cmd == 'sudo iptables-save -t filter':
                return '\n'.join(self.in_rules), None
            if cmd == 'sudo iptables-restore':
                self.out_rules = process_input.split('\n')
                return '', ''
            if cmd == 'sudo ip6tables-restore':
                self.out6_rules = process_input.split('\n')
                return '', ''
        self.fw.execute = fake_iptables_execute

        self.fw.prepare_instance_filter(instance_ref)
        self.fw.apply_instance_filter(instance_ref)

        in_rules = filter(lambda l: not l.startswith('#'), self.in_rules)
        for rule in in_rules:
            if not 'nova' in rule:
                self.assertTrue(rule in self.out_rules,
                                'Rule went missing: %s' % rule)

        instance_chain = None
        for rule in self.out_rules:
            # This is pretty crude, but it'll do for now
            if '-d 10.11.12.13 -j' in rule:
                instance_chain = rule.split(' ')[-1]
                break
        self.assertTrue(instance_chain, "The instance chain wasn't added")

        security_group_chain = None
        for rule in self.out_rules:
            # This is pretty crude, but it'll do for now
            if '-A %s -j' % instance_chain in rule:
                security_group_chain = rule.split(' ')[-1]
                break
        self.assertTrue(security_group_chain,
                        "The security group chain wasn't added")

        self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT' % \
                               security_group_chain in self.out_rules,
                        "ICMP acceptance rule wasn't added")

        self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type '
                        '8 -j ACCEPT' % security_group_chain in self.out_rules,
                        "ICMP Echo Request acceptance rule wasn't added")

        self.assertTrue('-A %s -p tcp -s 192.168.10.0/24 -m multiport '
                        '--dports 80:81 -j ACCEPT' % security_group_chain \
                            in self.out_rules,
#.........这里部分代码省略.........
开发者ID:anotherjesse,项目名称:nova,代码行数:101,代码来源:test_virt.py


示例16: test_static_filters

    def test_static_filters(self):
        instance_ref = db.instance_create(
            self.context, {"user_id": "fake", "project_id": "fake", "mac_address": "56:12:12:12:12:12"}
        )
        ip = "10.11.12.13"

        network_ref = db.project_get_network(self.context, "fake")

        fixed_ip = {"address": ip, "network_id": network_ref["id"]}

        admin_ctxt = context.get_admin_context()
        db.fixed_ip_create(admin_ctxt, fixed_ip)
        db.fixed_ip_update(admin_ctxt, ip, {"allocated": True, "instance_id": instance_ref["id"]})

        secgroup = db.security_group_create(
            admin_ctxt, {"user_id": "fake", "project_id": "fake", "name": "testgroup", "description": "test group"}
        )

        db.security_group_rule_create(
            admin_ctxt,
            {
                "parent_group_id": secgroup["id"],
                "protocol": "icmp",
                "from_port": -1,
                "to_port": -1,
                "cidr": "192.168.11.0/24",
            },
        )

        db.security_group_rule_create(
            admin_ctxt,
            {
                "parent_group_id": secgroup["id"],
                "protocol": "icmp",
                "from_port": 8,
                "to_port": -1,
                "cidr": "192.168.11.0/24",
            },
        )

        db.security_group_rule_create(
            admin_ctxt,
            {
                "parent_group_id": secgroup["id"],
                "protocol": "tcp",
                "from_port": 80,
                "to_port": 81,
                "cidr": "192.168.10.0/24",
            },
        )

        db.instance_add_security_group(admin_ctxt, instance_ref["id"], secgroup["id"])
        instance_ref = db.instance_get(admin_ctxt, instance_ref["id"])

        #        self.fw.add_instance(instance_ref)
        def fake_iptables_execute(cmd, process_input=None):
            if cmd == "sudo ip6tables-save -t filter":
                return "\n".join(self.in6_rules), None
            if cmd == "sudo iptables-save -t filter":
                return "\n".join(self.in_rules), None
            if cmd == "sudo iptables-restore":
                self.out_rules = process_input.split("\n")
                return "", ""
            if cmd == "sudo ip6tables-restore":
                self.out6_rules = process_input.split("\n")
                return "", ""

        self.fw.execute = fake_iptables_execute

        self.fw.prepare_instance_filter(instance_ref)
        self.fw.apply_instance_filter(instance_ref)

        in_rules = filter(lambda l: not l.startswith("#"), self.in_rules)
        for rule in in_rules:
            if not "nova" in rule:
                self.assertTrue(rule in self.out_rules, "Rule went missing: %s" % rule)

        instance_chain = None
        for rule in self.out_rules:
            # This is pretty crude, but it'll do for now
            if "-d 10.11.12.13 -j" in rule:
                instance_chain = rule.split(" ")[-1]
                break
        self.assertTrue(instance_chain, "The instance chain wasn't added")

        security_group_chain = None
        for rule in self.out_rules:
            # This is pretty crude, but it'll do for now
            if "-A %s -j" % instance_chain in rule:
                security_group_chain = rule.split(" ")[-1]
                break
        self.assertTrue(security_group_chain, "The security group chain wasn't added")

        self.assertTrue(
            "-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT" % security_group_chain in self.out_rules,
            "ICMP acceptance rule wasn't added",
        )

        self.assertTrue(
            "-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type "
#.........这里部分代码省略.........
开发者ID:yosh,项目名称:nova,代码行数:101,代码来源:test_virt.py


示例17: test_creates_base_rule_first

    def test_creates_base_rule_first(self):
        # These come pre-defined by libvirt
        self.defined_filters = ["no-mac-spoofing", "no-ip-spoofing", "no-arp-spoofing", "allow-dhcp-server"]

        self.recursive_depends = {}
        for f in self.defined_filters:
            self.recursive_depends[f] = []

        def _filterDefineXMLMock(xml):
            dom = xml_to_dom(xml)
            name = dom.firstChild.getAttribute("name")
            self.recursive_depends[name] = []
            for f in dom.getElementsByTagName("filterref"):
                ref = f.getAttribute("filter")
                self.assertTrue(
                    ref in self.defined_filters, ("%s referenced filter that does " + "not yet exist: %s") % (name, ref)
                )
                dependencies = [ref] + self.recursive_depends[ref]
                self.recursive_depends[name] += dependencies

            self.defined_filters.append(name)
            return True

        self.fake_libvirt_connection.nwfilterDefineXML = _filterDefineXMLMock

        instance_ref = db.instance_create(self.context, {"user_id": "fake", "project_id": "fake"})
        inst_id = instance_ref["id"]

        ip = "10.11.12.13"

        network_ref = db.project_get_network(self.context, "fake")

        fixed_ip = {"address": ip, "network_id": network_ref["id"]}

        admin_ctxt = context.get_admin_context()
        db.fixed_ip_create(admin_ctxt, fixed_ip)
        db.fixed_ip_update(admin_ctxt, ip, {"allocated": True, "instance_id": instance_ref["id"]})

        def _ensure_all_called():
            instance_filter = "nova-instance-%s" % instance_ref["name"]
            secgroup_filter = "nova-secgroup-%s" % self.security_group["id"]
            for required in [
                secgroup_filter,
                "allow-dhcp-server",
                "no-arp-spoofing",
                "no-ip-spoofing",
                "no-mac-spoofing",
            ]:
                self.assertTrue(
                    required in self.recursive_depends[instance_filter],
                    "Instance's filter does not include %s" % required,
                )

        self.security_group = self.setup_and_return_security_group()

        db.instance_add_security_group(self.context, inst_id, self.security_group.id)
        instance = db.instance_get(self.context, inst_id)

        self.fw.setup_basic_filtering(instance)
        self.fw.prepare_instance_filter(instance)
        self.fw.apply_instance_filter(instance)
        _ensure_all_called()
        self.teardown_security_group()
        db.instance_destroy(admin_ctxt, instance_ref["id"])
开发者ID:yosh,项目名称:nova,代码行数:64,代码来源:test_virt.py


示例18: _check_xml_and_uri

    def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel, rescue=False):
        user_context = context.RequestContext(project=self.project, user=self.user)
        instance_ref = db.instance_create(user_context, instance)
        host = self.network.get_network_host(user_context.elevated())
        network_ref = db.project_get_network(context.get_admin_context(), self.project.id)

        fixed_ip = {"address": self.test_ip, "network_id": network_ref["id"]}

        ctxt = context.get_admin_context()
        fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
        db.fixed_ip_update(ctxt, self.test_ip, {"allocated": True, "instance_id": instance_ref["id"]})

        type_uri_map = {
            "qemu": (
                "qemu:///system",
                [
                    (lambda t: t.find(".").get("type"), "qemu"),
                    (lambda t: t.find("./os/type").text, "hvm"),
                    (lambda t: t.find("./devices/emulator"), None),
                ],
            ),
            "kvm": (
                "qemu:///system",
                [
                    (lambda t: t.find(".").get("type"), "kvm"),
                    (lambda t: t.find("./os/type").text, "hvm"),
                    (lambda t: t.find("./devices/emulator"), None),
                ],
            ),
            "uml": (
                "uml:///system",
                [(lambda t: t.find(".").get("type"), "uml"), (lambda t: t.find("./os/type").text, "uml")],
            ),
            "xen": (
                "xen:///",
                [(lambda t: t.find(".").get("type"), "xen"), (lambda t: t.find("./os/type").text, "linux")],
            ),
        }

        for hypervisor_type in ["qemu", "kvm", "xen"]:
            check_list = type_uri_map[hypervisor_type][1]

            if rescue:
                check = (lambda t: t.find("./os/kernel").text.split("/")[1], "kernel.rescue")
                check_list.append(check)
                check = (lambda t: t.find("./os/initrd").text.split("/")[1], "ramdisk.rescue")
                check_list.append(check)
            else:
                if expect_kernel:
                    check = (lambda t: t.find("./os/kernel").text.split("/")[1], "kernel")
                else:
                    check = (lambda t: t.find("./os/kernel"), None)
                check_list.append(check)

                if expect_ramdisk:
                    check = (lambda t: t.find("./os/initrd").text.split("/")[1], "ramdisk")
                else:
                    check = (lambda t: t.find("./os/initrd"), None)
                check_list.append(check)

        common_checks = [
            (lambda t: t.find(".").tag, "domain"),
            (lambda t: t.find("./devices/interface/filterref/parameter").get("name"), "IP"),
            (lambda t: t.find("./devices/interface/filterref/parameter").get("value"), "10.11.12.13"),
            (lambda t: t.findall("./devices/interface/filterref/parameter")[1].get("name"), "DHCPSERVER"),
            (lambda t: t.findall("./devices/interface/filterref/parameter")[1].get("value"), "10.0.0.1"),
            (lambda t: t.find("./devices/serial/source").get("path").split("/")[1], "console.log"),
            (lambda t: t.find("./memory").text, "2097152"),
        ]
        if rescue:
            common_checks += [
                (lambda t: t.findall("./devices/disk/source")[0].get("file").split("/")[1], "disk.rescue"),
                (lambda t: t.findall("./devices/disk/source")[1].get("file").split("/")[1], "disk"),
            ]
        else:
            common_checks += [(lambda t: t.findall("./devices/disk/source")[0].get("file").split("/")[1], "disk")]
            common_checks += [(lambda t: t.findall("./devices/disk/source")[1].get("file").split("/")[1], "disk.local")]

        for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
            FLAGS.libvirt_type = libvirt_type
            conn = libvirt_conn.LibvirtConnection(True)

            uri = conn.get_uri()
            self.assertEquals(uri, expected_uri)

            xml = conn.to_xml(instance_ref, rescue)
            tree = xml_to_tree(xml)
            for i, (check, expected_result) in enumerate(checks):
                self.assertEqual(check(tree), expected_result, "%s failed check %d" % (xml, i))

            for i, (check, expected_result) in enumerate(common_checks):
                self.assertEqual(check(tree), expected_result, "%s failed common check %d" % (xml, i))

        # This test is supposed to make sure we don't override a specifically
        # set uri
        #
        # Deliberately not just assigning this string to FLAGS.libvirt_uri and
        # checking against that later on. This way we make sure the
        # implementation doesn't fiddle around with the FLAGS.
        testuri = "something completely different"
#.........这里部分代码省略.........
开发者ID:yosh,项目名称:nova,代码行数:101,代码来源:test_virt.py


示例19: test_creates_base_rule_first

    def test_creates_base_rule_first(self):
        # These come pre-defined by libvirt
        self.defined_filters = ['no-mac-spoofing',
                                'no-ip-spoofing',
                                'no-arp-spoofing',
                                'allow-dhcp-server']

        self.recursive_depends = {}
        for f in self.defined_filters:
            self.recursive_depends[f] = []

        def _filterDefineXMLMock(xml):
            dom = xml_to_dom(xml)
            name = dom.firstChild.getAttribute('name')
            self.recursive_depends[name] = []
            for f in dom.getElementsByTagName('filterref'):
                ref = f.getAttribute('filter')
                self.assertTrue(ref in self.defined_filters,
                                ('%s referenced filter that does ' +
                                'not yet exist: %s') % (name, ref))
                dependencies = [ref] + self.recursive_depends[ref]
                self.recursive_depends[name] += dependencies

            self.defined_filters.append(name)
            return True

        self.fake_libvirt_connection.nwfilterDefineXML = _filterDefineXMLMock

        instance_ref = self._create_instance()
        inst_id = instance_ref['id']

        ip = '10.11.12.13'

        network_ref = db.project_get_network(self.context, 'fake')
        fixed_ip = {'address': ip, 'network_id': network_ref['id']}

        admin_ctxt = context.get_admin_context()
        db.fixed_ip_create(admin_ctxt, fixed_ip)
        db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
                                            'instance_id': inst_id})

        def _ensure_all_called():
            instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
                                                       '00A0C914C829')
            secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
            for required in [secgroup_filter, 'allow-dhcp-server',
                             'no-arp-spoofing', 'no-ip-spoofing',
                             'no-mac-spoofing']:
                self.assertTrue(required in
                                self.recursive_depends[instance_filter],
                                "Instance's filter does not include %s" %
                                required)

        self.security_group = self.setup_and_return_security_group()

        db.instance_add_security_group(self.context, inst_id,
                                       self.security_group.id)
        instance = db.instance_get(self.context, inst_id)

        self.fw.setup_basic_filtering(instance)
        self.fw.prepare_instance_filter(instance)
        self.fw.apply_instance_filter(instance)
        _ensure_all_called()
        self.teardown_security_group()
        db.instance_destroy(admin_ctxt, instance_ref['id'])
开发者ID:superstack,项目名称:nova,代码行数:65,代码来源:test_virt.py


示例20: test_static_filters

    def test_static_filters(self):
        instance_ref = self._create_instance_ref()
        ip = '10.11.12.13'

        network_ref = db.project_get_network(self.context,
                                             'fake')

        fixed_ip = {'address': ip,
                    'network_id': network_ref['id']}

        admin_ctxt = context.get_admin_context()
        db.fixed_ip_create(admin_ctxt, fixed_ip)
        db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
                                            'instance_id': instance_ref['id']})

        secgroup = db.security_group_create(admin_ctxt,
                                            {'user_id': 'fake',
                                             'project_id': 'fake',
                                             'name': 'testgroup',
                                             'description': 'test group'})

        db.security_group_rule_create(admin_ctxt,
                                      {'parent_group_id': secgroup['id'],
                                       'protocol': 'icmp',
                                       'from_port': -1,
                              

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python db.fixed_ip_disassociate函数代码示例发布时间:2022-05-27
下一篇:
Python db.fixed_ip_associate_pool函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap