• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python context.get_admin_context函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quantum.context.get_admin_context函数的典型用法代码示例。如果您正苦于以下问题:Python get_admin_context函数的具体用法?Python get_admin_context怎么用?Python get_admin_context使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_admin_context函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _setup_core_resources

    def _setup_core_resources(self):
        core_plugin = quantum.manager.QuantumManager.get_plugin()

        self._network = core_plugin.create_network(
            q_context.get_admin_context(),
            {
                'network':
                {
                    'tenant_id': self._tenant_id,
                    'name': 'test net',
                    'admin_state_up': True,
                    'shared': False,
                }
            }
        )

        self._subnet = core_plugin.create_subnet(
            q_context.get_admin_context(),
            {
                'subnet':
                {
                    'network_id': self._network['id'],
                    'name': 'test subnet',
                    'cidr': '192.168.1.0/24',
                    'ip_version': 4,
                    'gateway_ip': '192.168.1.1',
                    'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
                    'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
                    'host_routes': attributes.ATTR_NOT_SPECIFIED,
                    'enable_dhcp': True,
                }
            }
        )

        self._subnet_id = self._subnet['id']
开发者ID:Frostman,项目名称:quantum,代码行数:35,代码来源:test_routerserviceinsertion.py


示例2: _setup_core_resources

    def _setup_core_resources(self):
        core_plugin = quantum.manager.QuantumManager.get_plugin()

        self._network = core_plugin.create_network(
            q_context.get_admin_context(),
            {"network": {"tenant_id": self._tenant_id, "name": "test net", "admin_state_up": True, "shared": False}},
        )

        self._subnet = core_plugin.create_subnet(
            q_context.get_admin_context(),
            {
                "subnet": {
                    "network_id": self._network["id"],
                    "name": "test subnet",
                    "cidr": "192.168.1.0/24",
                    "ip_version": 4,
                    "gateway_ip": "192.168.1.1",
                    "allocation_pools": attributes.ATTR_NOT_SPECIFIED,
                    "dns_nameservers": attributes.ATTR_NOT_SPECIFIED,
                    "host_routes": attributes.ATTR_NOT_SPECIFIED,
                    "enable_dhcp": True,
                }
            },
        )

        self._subnet_id = self._subnet["id"]
开发者ID:vglafirov,项目名称:quantum,代码行数:26,代码来源:test_routerserviceinsertion.py


示例3: test_get_ready_devices_inactive_pool

    def test_get_ready_devices_inactive_pool(self):
        with self.vip() as vip:

            # set the pool inactive need to use plugin directly since
            # status is not tenant mutable
            self.plugin_instance.update_pool(
                context.get_admin_context(), vip["vip"]["pool_id"], {"pool": {"status": constants.INACTIVE}}
            )

            ready = self.callbacks.get_ready_devices(context.get_admin_context())
            self.assertFalse(ready)
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:11,代码来源:test_plugin.py


示例4: test_get_ready_devices_inactive_vip

    def test_get_ready_devices_inactive_vip(self):
        with self.vip() as vip:

            # set the vip inactive need to use plugin directly since
            # status is not tenant mutable
            self.plugin_instance.update_vip(
                context.get_admin_context(),
                vip['vip']['id'],
                {'vip': {'status': constants.INACTIVE}}
            )

            ready = self.callbacks.get_ready_devices(
                context.get_admin_context(),
            )
            self.assertFalse(ready)
开发者ID:Apsu,项目名称:quantum,代码行数:15,代码来源:test_plugin.py


示例5: setUp

    def setUp(self):
        self.adminContext = context.get_admin_context()
        test_config['config_files'] = [NVP_INI_CONFIG_PATH]
        test_config['plugin_name_v2'] = (
            'quantum.plugins.nicira.QuantumPlugin.NvpPluginV2')
        cfg.CONF.set_override('api_extensions_path',
                              NVP_EXTENSIONS_PATH)
        # Save the original RESOURCE_ATTRIBUTE_MAP
        self.saved_attr_map = {}
        for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
            self.saved_attr_map[resource] = attrs.copy()
        ext_mgr = MacLearningExtensionManager()
        test_config['extension_manager'] = ext_mgr
        # mock nvp api client
        self.fc = fake_nvpapiclient.FakeClient(NVP_FAKE_RESPS_PATH)
        self.mock_nvpapi = mock.patch('%s.NvpApiClient.NVPApiHelper'
                                      % NVP_MODULE_PATH, autospec=True)
        instance = self.mock_nvpapi.start()

        def _fake_request(*args, **kwargs):
            return self.fc.fake_request(*args, **kwargs)

        # Emulate tests against NVP 2.x
        instance.return_value.get_nvp_version.return_value = "2.999"
        instance.return_value.request.side_effect = _fake_request
        cfg.CONF.set_override('metadata_mode', None, 'NVP')
        self.addCleanup(self.fc.reset_all)
        self.addCleanup(self.mock_nvpapi.stop)
        self.addCleanup(self.restore_resource_attribute_map)
        self.addCleanup(cfg.CONF.reset)
        super(MacLearningDBTestCase, self).setUp()
开发者ID:Apsu,项目名称:quantum,代码行数:31,代码来源:test_maclearning.py


示例6: test_single_get_tenant

 def test_single_get_tenant(self):
     plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
     with self.network() as network:
         net_id = network['network']['id']
         ctx = context.get_admin_context()
         n = plugin._get_network(ctx, net_id)
         self.assertEqual(net_id, n.id)
开发者ID:john5223,项目名称:quantum,代码行数:7,代码来源:test_db_plugin.py


示例7: setUp

    def setUp(self):
        super(MetaQuantumPluginV2Test, self).setUp()
        db._ENGINE = None
        db._MAKER = None
        self.fake_tenant_id = uuidutils.generate_uuid()
        self.context = context.get_admin_context()

        db.configure_db()

        setup_metaplugin_conf()

        self.mox = mox.Mox()
        self.stubs = stubout.StubOutForTesting()
        args = ['--config-file', etcdir('quantum.conf.test')]
        self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
        client_cls = self.client_cls_p.start()
        self.client_inst = mock.Mock()
        client_cls.return_value = self.client_inst
        self.client_inst.create_network.return_value = \
            {'id': 'fake_id'}
        self.client_inst.create_port.return_value = \
            {'id': 'fake_id'}
        self.client_inst.create_subnet.return_value = \
            {'id': 'fake_id'}
        self.client_inst.update_network.return_value = \
            {'id': 'fake_id'}
        self.client_inst.update_port.return_value = \
            {'id': 'fake_id'}
        self.client_inst.update_subnet.return_value = \
            {'id': 'fake_id'}
        self.client_inst.delete_network.return_value = True
        self.client_inst.delete_port.return_value = True
        self.client_inst.delete_subnet.return_value = True
        self.plugin = MetaPluginV2(configfile=None)
开发者ID:abhiraut,项目名称:quantum,代码行数:34,代码来源:test_metaplugin.py


示例8: test_update_vip_change_pool

 def test_update_vip_change_pool(self):
     with self.subnet() as subnet:
         with contextlib.nested(
             self.pool(name="pool1"),
             self.pool(name="pool2")
         ) as (pool1, pool2):
             with self.vip(name='vip1', subnet=subnet, pool=pool1) as vip:
                 # change vip from pool1 to pool2
                 vip_data = {
                     'id': vip['vip']['id'],
                     'name': 'vip1',
                     'pool_id': pool2['pool']['id'],
                 }
                 ctx = context.get_admin_context()
                 self.plugin.update_vip(ctx,
                                        vip['vip']['id'],
                                        {'vip': vip_data})
                 db_pool2 = (ctx.session.query(ldb.Pool).
                             filter_by(id=pool2['pool']['id']).one())
                 db_pool1 = (ctx.session.query(ldb.Pool).
                             filter_by(id=pool1['pool']['id']).one())
                 # check that pool1.vip became None
                 self.assertIsNone(db_pool1.vip)
                 # and pool2 got vip
                 self.assertEqual(db_pool2.vip.id, vip['vip']['id'])
开发者ID:CiscoAS,项目名称:quantum,代码行数:25,代码来源:test_db_loadbalancer.py


示例9: test_delete_healthmonitor_cascade_deletion_of_associations

    def test_delete_healthmonitor_cascade_deletion_of_associations(self):
        with self.health_monitor(type='HTTP', no_delete=True) as monitor:
            with self.pool() as pool:
                data = {
                    'health_monitor': {
                        'id': monitor['health_monitor']['id'],
                        'tenant_id': self._tenant_id
                    }
                }
                req = self.new_create_request(
                    'pools',
                    data,
                    fmt=self.fmt,
                    id=pool['pool']['id'],
                    subresource='health_monitors')
                res = req.get_response(self.ext_api)
                self.assertEqual(res.status_int, 201)

                ctx = context.get_admin_context()

                # check if we actually have corresponding Pool associations
                qry = ctx.session.query(ldb.PoolMonitorAssociation)
                qry = qry.filter_by(monitor_id=monitor['health_monitor']['id'])
                self.assertTrue(qry.all())
                # delete the HealthMonitor instance
                req = self.new_delete_request(
                    'health_monitors',
                    monitor['health_monitor']['id']
                )
                res = req.get_response(self.ext_api)
                self.assertEqual(res.status_int, 204)
                # check if all corresponding Pool associations are deleted
                qry = ctx.session.query(ldb.PoolMonitorAssociation)
                qry = qry.filter_by(monitor_id=monitor['health_monitor']['id'])
                self.assertFalse(qry.all())
开发者ID:alanmeadows,项目名称:quantum,代码行数:35,代码来源:test_db_loadbalancer.py


示例10: test_model_update_port_rollback

    def test_model_update_port_rollback(self):
        """Test for proper rollback for Cisco model layer update port failure.

        Test that the vSwitch plugin port configuration is rolled back
        (restored) by the Cisco plugin model layer when there is a
        failure in the Nexus sub-plugin for an update port operation.

        """
        with self.port(fmt=self.fmt) as orig_port:

            inserted_exc = ValueError
            with mock.patch.object(
                virt_phy_sw_v2.VirtualPhysicalSwitchModelV2,
                '_invoke_nexus_for_net_create',
                side_effect=inserted_exc):

                # Send an update port request with a new device ID
                device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
                if orig_port['port']['device_id'] == device_id:
                    device_id = "600df00d-e4a8-4a3a-8906-feed600df00d"
                data = {'port': {'device_id': device_id}}
                port_id = orig_port['port']['id']
                req = self.new_update_request('ports', data, port_id)
                res = req.get_response(self.api)

                # Sanity check failure result code
                self._assertExpectedHTTP(res.status_int, inserted_exc)

                # Check that the port still has the original device ID
                plugin = base_plugin.QuantumDbPluginV2()
                ctx = context.get_admin_context()
                db_port = plugin._get_port(ctx, port_id)
                self.assertEqual(db_port['device_id'],
                                 orig_port['port']['device_id'])
开发者ID:DestinyOneSystems,项目名称:quantum,代码行数:34,代码来源:test_network_plugin.py


示例11: set_default_svctype_id

def set_default_svctype_id(original_id):
    if not original_id:
        svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
        # Fetch default service type - it must exist
        res = svctype_mgr.get_service_types(context.get_admin_context(), filters={"default": [True]})
        return res[0]["id"]
    return original_id
开发者ID:kaiweifan,项目名称:vse-lbaas-plugin-poc,代码行数:7,代码来源:servicetype.py


示例12: test_ports_vif_host

 def test_ports_vif_host(self):
     cfg.CONF.set_default('allow_overlapping_ips', True)
     host_arg = {portbindings.HOST_ID: self.hostname}
     with contextlib.nested(
         self.port(name='name1',
                   arg_list=(portbindings.HOST_ID,),
                   **host_arg),
         self.port(name='name2')):
         ctx = context.get_admin_context()
         ports = self._list('ports', quantum_context=ctx)['ports']
         self.assertEqual(2, len(ports))
         for port in ports:
             if port['name'] == 'name1':
                 self._check_response_portbindings_host(port)
             else:
                 self.assertFalse(port[portbindings.HOST_ID])
         # By default user is admin - now test non admin user
         ctx = context.Context(user_id=None,
                               tenant_id=self._tenant_id,
                               is_admin=False,
                               read_deleted="no")
         ports = self._list('ports', quantum_context=ctx)['ports']
         self.assertEqual(2, len(ports))
         for non_admin_port in ports:
             self._check_response_no_portbindings_host(non_admin_port)
开发者ID:Apsu,项目名称:quantum,代码行数:25,代码来源:_test_extension_portbindings.py


示例13: test_create_pool_healthmon_invalid_pool_id

 def test_create_pool_healthmon_invalid_pool_id(self):
     with self.health_monitor() as healthmon:
         self.assertRaises(loadbalancer.PoolNotFound,
                           self.plugin.create_pool_health_monitor,
                           context.get_admin_context(),
                           healthmon,
                           "123-456-789"
                           )
开发者ID:alanmeadows,项目名称:quantum,代码行数:8,代码来源:test_db_loadbalancer.py


示例14: setUp

 def setUp(self):
     super(OFCManagerTestBase, self).setUp()
     driver = "quantum.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
     config.CONF.set_override('driver', driver, 'OFC')
     ndb.initialize()
     self.addCleanup(ndb.clear_db)
     self.ofc = ofc_manager.OFCManager()
     self.ctx = context.get_admin_context()
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:8,代码来源:test_ofc_manager.py


示例15: test_network_update_with_provider_attrs

 def test_network_update_with_provider_attrs(self):
     ctx = context.get_admin_context()
     ctx.tenant_id = "an_admin"
     res, data, net_id = self._put_network_with_provider_attrs(ctx)
     instance = self.plugin.return_value
     exp_input = {"network": data}
     instance.update_network.assert_called_with(mock.ANY, net_id, network=exp_input)
     self.assertEqual(res.status_int, web_exc.HTTPOk.code)
开发者ID:soheilhy,项目名称:quantum,代码行数:8,代码来源:test_extension_pnet.py


示例16: setUp

 def setUp(self):
     self.addCleanup(mock.patch.stopall)
     ofc_manager_cls = mock.patch(OFC_MANAGER).start()
     ofc_driver = ofc_manager_cls.return_value.driver
     ofc_driver.filter_supported.return_value = self.PACKET_FILTER_ENABLE
     super(NecPluginV2TestCase, self).setUp(self._plugin_name)
     self.context = q_context.get_admin_context()
     self.plugin = manager.QuantumManager.get_plugin()
开发者ID:DestinyOneSystems,项目名称:quantum,代码行数:8,代码来源:test_nec_plugin.py


示例17: testQuantumContextAdminToDict

 def testQuantumContextAdminToDict(self):
     self.db_api_session.return_value = 'fakesession'
     cxt = context.get_admin_context()
     cxt_dict = cxt.to_dict()
     self.assertIsNone(cxt_dict['user_id'])
     self.assertIsNone(cxt_dict['tenant_id'])
     self.assertIsNotNone(cxt.session)
     self.assertFalse('session' in cxt_dict)
开发者ID:StackOps,项目名称:quantum,代码行数:8,代码来源:test_quantum_context.py


示例18: _validate_servicetype_ref

def _validate_servicetype_ref(data, valid_values=None):
    """ Verify the service type id exists """
    svc_type_id = data
    svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
    try:
        svctype_mgr.get_service_type(context.get_admin_context(), svc_type_id)
    except servicetype_db.ServiceTypeNotFound:
        return _("The service type '%s' does not exist") % svc_type_id
开发者ID:kaiweifan,项目名称:vse-lbaas-plugin-poc,代码行数:8,代码来源:servicetype.py


示例19: test_delete_vip

 def test_delete_vip(self):
     with self.subnet() as subnet:
         with self.pool(subnet=subnet) as pool:
             with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
                 self.mock_api.reset_mock()
                 ctx = context.get_admin_context()
                 self.plugin_instance.delete_vip(ctx, vip["vip"]["id"])
                 self.mock_api.destroy_pool.assert_called_once_with(mock.ANY, vip["vip"]["pool_id"])
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:8,代码来源:test_plugin.py


示例20: _send_all_data

    def _send_all_data(self):
        """Pushes all data to network ctrl (networks/ports, ports/attachments)
        to give the controller an option to re-sync it's persistent store
        with quantum's current view of that data.
        """
        admin_context = qcontext.get_admin_context()
        networks = {}
        ports = {}

        all_networks = super(QuantumRestProxyV2,
                             self).get_networks(admin_context) or []
        for net in all_networks:
            networks[net.get('id')] = {
                'id': net.get('id'),
                'name': net.get('name'),
                'op-status': net.get('admin_state_up'),
            }

            subnets = net.get('subnets', [])
            for subnet_id in subnets:
                subnet = self.get_subnet(admin_context, subnet_id)
                gateway_ip = subnet.get('gateway_ip')
                if gateway_ip:
                    # FIX: For backward compatibility with wire protocol
                    networks[net.get('id')]['gateway'] = gateway_ip

            ports = []
            net_filter = {'network_id': [net.get('id')]}
            net_ports = super(QuantumRestProxyV2,
                              self).get_ports(admin_context,
                                              filters=net_filter) or []
            for port in net_ports:
                port_details = {
                    'id': port.get('id'),
                    'attachment': {
                        'id': port.get('id') + '00',
                        'mac': port.get('mac_address'),
                    },
                    'state': port.get('status'),
                    'op-status': port.get('admin_state_up'),
                    'mac': None
                }
                ports.append(port_details)
            networks[net.get('id')]['ports'] = ports
        try:
            resource = '/topology'
            data = {
                'networks': networks,
            }
            ret = self.servers.put(resource, data)
            if not self.servers.action_success(ret):
                raise RemoteRestError(ret[2])
            return ret
        except RemoteRestError as e:
            LOG.error(
                'QuantumRestProxy: Unable to update remote network: %s' %
                e.message)
            raise
开发者ID:brosenberg,项目名称:quantum-pub,代码行数:58,代码来源:plugin.py



注:本文中的quantum.context.get_admin_context函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.get_admin_context_without_session函数代码示例发布时间:2022-05-26
下一篇:
Python utils.str_uuid函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap