• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api.get_session函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.db.api.get_session函数的典型用法代码示例。如果您正苦于以下问题:Python get_session函数的具体用法?Python get_session怎么用?Python get_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_ip_availability

def get_ip_availability(**kwargs):
    LOG.debug("Begin querying %s" % kwargs)
    used_ips = get_used_ips(neutron_db_api.get_session(), **kwargs)
    unused_ips = get_unused_ips(neutron_db_api.get_session(), used_ips,
                                **kwargs)
    LOG.debug("End querying")
    return dict(used=used_ips, unused=unused_ips)
开发者ID:evanscottgray,项目名称:quark,代码行数:7,代码来源:ip_availability.py


示例2: _get_agent_fdb

    def _get_agent_fdb(self, segment, port, agent_host):
        if not agent_host:
            return

        network_id = port['network_id']

        session = db_api.get_session()
        agent_active_ports = l2pop_db.get_agent_network_active_port_count(
            session, agent_host, network_id)

        agent = l2pop_db.get_agent_by_host(db_api.get_session(), agent_host)
        if not self._validate_segment(segment, port['id'], agent):
            return

        agent_ip = l2pop_db.get_agent_ip(agent)
        other_fdb_entries = self._get_fdb_entries_template(
            segment, agent_ip, port['network_id'])
        if agent_active_ports == 0:
            # Agent is removing its last activated port in this network,
            # other agents needs to be notified to delete their flooding entry.
            other_fdb_entries[network_id]['ports'][agent_ip].append(
                const.FLOODING_ENTRY)
        # Notify other agents to remove fdb rules for current port
        if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
            fdb_entries = self._get_port_fdb_entries(port)
            other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries

        return other_fdb_entries
开发者ID:21atlas,项目名称:neutron,代码行数:28,代码来源:mech_driver.py


示例3: update_port_state_with_notifier

    def update_port_state_with_notifier(self, rpc_context, **kwargs):
        port_id = kwargs.get('port_id')
        network_id = kwargs.get('network_id')
        network_type = kwargs.get('network_type')
        segmentation_id = kwargs.get('segmentation_id')
        physical_network = kwargs.get('physical_network')

        # 1 update segment
        session = db_api.get_session()
        with session.begin(subtransactions=True):
            try:
                query = (session.query(models_ml2.NetworkSegment).
                         filter_by(network_id=network_id))
                query = query.filter_by(physical_network=physical_network)
                query = query.filter_by(is_dynamic=True)
                record = query.one()
                record.segmentation_id = segmentation_id
            except sa_exc.NoResultFound:
                pass

        # 2 change port state
        plugin = manager.NeutronManager.get_plugin()
        plugin.update_port_status(
            rpc_context,
            port_id,
            constants.PORT_STATUS_ACTIVE
        )

        # 3 serch db from port_id
        session = db_api.get_session()
        port = None
        with session.begin(subtransactions=True):
            try:
                port_db = (session.query(models_v2.Port).
                           enable_eagerloads(False).
                           filter(models_v2.Port.id.startswith(port_id)).
                           one())
                port = plugin._make_port_dict(port_db)
            except sa_exc.NoResultFound:
                LOG.error(_LE("Can't find port with port_id %s"),
                          port_id)
            except sa_exc.MultipleResultsFound:
                LOG.error(_LE("Multiple ports have port_id starting with %s"),
                          port_id)
        # 4 send notifier
        if port is not None:
            LOG.debug("notifier port_update %(net_type)s, %(seg_id)s, "
                      "%(physnet)s",
                      {'net_type': network_type,
                       'seg_id': segmentation_id,
                       'physnet': physical_network})
            plugin.notifier.port_update(
                rpc_context, port,
                network_type,
                segmentation_id,
                physical_network
            )

        return {}
开发者ID:nec-openstack,项目名称:networking-nec-nwa,代码行数:59,代码来源:nwa_l2_server_callback.py


示例4: _test_and_create_object

def _test_and_create_object(id):
    try:
        session = db_api.get_session()
        with session.begin():
            session.query(models.DFLockedObjects).filter_by(
                object_uuid=id).one()
    except orm_exc.NoResultFound:
        try:
            session = db_api.get_session()
            with session.begin():
                _create_db_row(session, oid=id)
        except db_exc.DBDuplicateEntry:
            # the lock is concurrently created.
            pass
开发者ID:almightyyeh,项目名称:dragonflow,代码行数:14,代码来源:lockedobjects_db.py


示例5: _verify_get_nsx_switch_and_port_id

 def _verify_get_nsx_switch_and_port_id(self, exp_ls_uuid, exp_lp_uuid):
     # The nvplib and db calls are  mocked, therefore the cluster
     # and the neutron_port_id parameters can be set to None
     ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
         db_api.get_session(), None, None)
     self.assertEqual(exp_ls_uuid, ls_uuid)
     self.assertEqual(exp_lp_uuid, lp_uuid)
开发者ID:Taejun,项目名称:neutron,代码行数:7,代码来源:test_nsx_utils.py


示例6: test_populate_policy_profile_delete

 def test_populate_policy_profile_delete(self):
     # Patch the Client class with the TestClient class
     with mock.patch(n1kv_client.__name__ + ".Client",
                     new=fake_client.TestClient):
         # Patch the _get_total_profiles() method to return a custom value
         with mock.patch(fake_client.__name__ +
                         '.TestClient._get_total_profiles') as obj_inst:
             # Return 3 policy profiles
             obj_inst.return_value = 3
             plugin = manager.NeutronManager.get_plugin()
             plugin._populate_policy_profiles()
             db_session = db.get_session()
             profile = n1kv_db_v2.get_policy_profile(
                 db_session, '00000000-0000-0000-0000-000000000001')
             # Verify that DB contains only 3 policy profiles
             self.assertEqual('pp-1', profile['name'])
             profile = n1kv_db_v2.get_policy_profile(
                 db_session, '00000000-0000-0000-0000-000000000002')
             self.assertEqual('pp-2', profile['name'])
             profile = n1kv_db_v2.get_policy_profile(
                 db_session, '00000000-0000-0000-0000-000000000003')
             self.assertEqual('pp-3', profile['name'])
             self.assertRaises(c_exc.PolicyProfileIdNotFound,
                               n1kv_db_v2.get_policy_profile,
                               db_session,
                               '00000000-0000-0000-0000-000000000004')
             # Return 2 policy profiles
             obj_inst.return_value = 2
             plugin._populate_policy_profiles()
             # Verify that the third policy profile is deleted
             self.assertRaises(c_exc.PolicyProfileIdNotFound,
                               n1kv_db_v2.get_policy_profile,
                               db_session,
                               '00000000-0000-0000-0000-000000000003')
开发者ID:aignatov,项目名称:neutron,代码行数:34,代码来源:test_n1kv_plugin.py


示例7: update_ip_owner

 def update_ip_owner(self, ip_owner_info):
     ports_to_update = set()
     port_id = ip_owner_info.get('port')
     ipv4 = ip_owner_info.get('ip_address_v4')
     ipv6 = ip_owner_info.get('ip_address_v6')
     network_id = ip_owner_info.get('network_id')
     if not port_id or (not ipv4 and not ipv6):
         return ports_to_update
     LOG.debug("Got IP owner update: %s", ip_owner_info)
     core_plugin = self._get_plugin()
     # REVISIT: just use SQLAlchemy session and models_v2.Port?
     port = core_plugin.get_port(nctx.get_admin_context(), port_id)
     if not port:
         LOG.debug("Ignoring update for non-existent port: %s", port_id)
         return ports_to_update
     ports_to_update.add(port_id)
     for ipa in [ipv4, ipv6]:
         if not ipa:
             continue
         try:
             session = db_api.get_session()
             with session.begin(subtransactions=True):
                 old_owner = self.ha_ip_handler.get_port_for_ha_ipaddress(
                     ipa, network_id or port['network_id'], session=session)
                 self.ha_ip_handler.set_port_id_for_ha_ipaddress(port_id,
                                                                 ipa,
                                                                 session)
                 if old_owner and old_owner['port_id'] != port_id:
                     self.ha_ip_handler.delete_port_id_for_ha_ipaddress(
                         old_owner['port_id'], ipa, session=session)
                     ports_to_update.add(old_owner['port_id'])
         except db_exc.DBReferenceError as dbe:
             LOG.debug("Ignoring FK error for port %s: %s", port_id, dbe)
     return ports_to_update
开发者ID:noironetworks,项目名称:apic-ml2-driver,代码行数:34,代码来源:port_ha_ipaddress_binding.py


示例8: get_port_and_sgs

def get_port_and_sgs(port_id):
    """Get port from database with security group info."""

    LOG.debug(_("get_port_and_sgs() called for port_id %s"), port_id)
    session = db_api.get_session()
    sg_binding_port = sg_db.SecurityGroupPortBinding.port_id

    with session.begin(subtransactions=True):
        query = session.query(models_v2.Port,
                              sg_db.SecurityGroupPortBinding.security_group_id)
        query = query.outerjoin(sg_db.SecurityGroupPortBinding,
                                models_v2.Port.id == sg_binding_port)
        query = query.filter(models_v2.Port.id.startswith(port_id))
        port_and_sgs = query.all()
        if not port_and_sgs:
            return
        port = port_and_sgs[0][0]
        plugin = manager.NeutronManager.get_plugin()
        port_dict = plugin._make_port_dict(port)
        port_dict['security_groups'] = [
            sg_id for port_, sg_id in port_and_sgs if sg_id]
        port_dict['security_group_rules'] = []
        port_dict['security_group_source_groups'] = []
        port_dict['fixed_ips'] = [ip['ip_address']
                                  for ip in port['fixed_ips']]
        return port_dict
开发者ID:50infivedays,项目名称:neutron,代码行数:26,代码来源:db.py


示例9: sanitize_policy_profile_table

 def sanitize_policy_profile_table(self):
     """Clear policy profiles from stale VSM."""
     db_session = db.get_session()
     hosts = config.get_vsm_hosts()
     vsm_info = db_session.query(
         n1kv_models.PolicyProfile.vsm_ip).distinct()
     if vsm_info is None or hosts is None:
         return
     vsm_ips = [vsm_ip[0] for vsm_ip in vsm_info if vsm_ip[0] not in hosts]
     for vsm_ip in vsm_ips:
         pprofiles = n1kv_db.get_policy_profiles_by_host(vsm_ip, db_session)
         for pprofile in pprofiles:
             # Do not delete profile if it is in use and if it
             # is the only VSM to have it configured
             pp_in_use = n1kv_db.policy_profile_in_use(pprofile['id'],
                                                       db_session)
             num_vsm_using_pp = db_session.query(
                 n1kv_models.PolicyProfile).filter_by(
                 id=pprofile['id']).count()
             if (not pp_in_use) or (num_vsm_using_pp > 1):
                 db_session.delete(pprofile)
                 db_session.flush()
             else:
                 LOG.warning(_LW('Cannot delete policy profile %s '
                                 'as it is in use.'), pprofile['id'])
开发者ID:JoelCapitao,项目名称:networking-cisco,代码行数:25,代码来源:policy_profile_service.py


示例10: remove_reserved_binding

def remove_reserved_binding(vlan_id, switch_ip, instance_id,
                            port_id):
    """Removes reserved binding.

    This overloads port bindings to support reserved Switch binding
    used to maintain the state of a switch so it can be viewed by
    all other neutron processes. There's also the case of
    a reserved port binding to keep switch information on a given
    interface.
    The values of these arguments is as follows:
    :param vlan_id: 0
    :param switch_ip: ip address of the switch
    :param instance_id: fixed string RESERVED_NEXUS_SWITCH_DEVICE_ID_R1
    :                   or RESERVED_NEXUS_PORT_DEVICE_ID_R1
    :param port_id: switch-state of ACTIVE, RESTORE_S1, RESTORE_S2, INACTIVE
    :               port-expected port_id
    """
    if not port_id:
        LOG.warning(_LW("remove_reserved_binding called with no state"))
        return
    LOG.debug("remove_reserved_binding called")
    session = db.get_session()
    binding = _lookup_one_nexus_binding(session=session,
                                        vlan_id=vlan_id,
                                        switch_ip=switch_ip,
                                        instance_id=instance_id,
                                        port_id=port_id)
    for bind in binding:
        session.delete(bind)
    session.flush()
    return binding
开发者ID:JoelCapitao,项目名称:networking-cisco,代码行数:31,代码来源:nexus_db_v2.py


示例11: sync_tunnel_allocations

def sync_tunnel_allocations(tunnel_id_ranges):
    """Synchronize tunnel_allocations table with configured tunnel ranges."""

    # determine current configured allocatable tunnels
    tunnel_ids = set()
    for tunnel_id_range in tunnel_id_ranges:
        tun_min, tun_max = tunnel_id_range
        if tun_max + 1 - tun_min > 1000000:
            LOG.error(
                _("Skipping unreasonable tunnel ID range " "%(tun_min)s:%(tun_max)s"),
                {"tun_min": tun_min, "tun_max": tun_max},
            )
        else:
            tunnel_ids |= set(xrange(tun_min, tun_max + 1))

    session = db.get_session()
    with session.begin():
        # remove from table unallocated tunnels not currently allocatable
        allocs = session.query(ovs_models_v2.TunnelAllocation).all()
        for alloc in allocs:
            try:
                # see if tunnel is allocatable
                tunnel_ids.remove(alloc.tunnel_id)
            except KeyError:
                # it's not allocatable, so check if its allocated
                if not alloc.allocated:
                    # it's not, so remove it from table
                    LOG.debug(_("Removing tunnel %s from pool"), alloc.tunnel_id)
                    session.delete(alloc)

        # add missing allocatable tunnels to table
        for tunnel_id in sorted(tunnel_ids):
            alloc = ovs_models_v2.TunnelAllocation(tunnel_id)
            session.add(alloc)
开发者ID:kobtea,项目名称:neutron,代码行数:34,代码来源:ovs_db_v2.py


示例12: _get_mock_port_operation_context

 def _get_mock_port_operation_context():
     current = {'status': 'DOWN',
                'binding:host_id': '',
                'allowed_address_pairs': [],
                'device_owner': 'fake_owner',
                'binding:profile': {},
                'fixed_ips': [{
                    'subnet_id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839'}],
                'id': '83d56c48-e9b8-4dcf-b3a7-0813bb3bd940',
                'security_groups': [SECURITY_GROUP],
                'device_id': 'fake_device',
                'name': '',
                'admin_state_up': True,
                'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
                'tenant_id': 'test-tenant',
                'binding:vif_details': {},
                'binding:vnic_type': 'normal',
                'binding:vif_type': 'unbound',
                'mac_address': '12:34:56:78:21:b6'}
     context = mock.Mock(current=current)
     context._plugin.get_security_group = mock.Mock(
         return_value=SECURITY_GROUP)
     context._plugin.get_port = mock.Mock(return_value=current)
     context._plugin_context.session = neutron_db_api.get_session()
     context._network_context = mock.Mock(
         _network=OpenDaylightMechanismDriverTestCase.
         _get_mock_network_operation_context().current)
     return context
开发者ID:heekof,项目名称:networking-odl,代码行数:28,代码来源:test_mechanism_odl_v2.py


示例13: _sync_create_ports

    def _sync_create_ports(self, combined_res_info, vsm_ip):
        """
        Sync ports by creating missing ones on VSM.

        :param combined_res_info: tuple containing VSM and neutron information
        :param vsm_ip: string representing the IP address of the VSM
        """
        (vsm_vmn_dict, neutron_ports) = combined_res_info
        vsm_port_uuids = set()
        for (k, v) in vsm_vmn_dict.items():
            port_dict = v['properties']
            port_ids = set(port_dict['portId'].split(','))
            vsm_port_uuids = vsm_port_uuids.union(port_ids)
        for port in neutron_ports:
            if port['id'] not in vsm_port_uuids:
                # create these ports on VSM
                network_uuid = port['network_id']
                binding = n1kv_db.get_policy_binding(port['id'])
                policy_profile_id = binding.profile_id
                policy_profile = n1kv_db.get_policy_profile_by_uuid(
                    db.get_session(), policy_profile_id)
                vmnetwork_name = "%s%s_%s" % (n1kv_const.VM_NETWORK_PREFIX,
                                              policy_profile_id,
                                              network_uuid)
                try:
                    self.n1kvclient.create_n1kv_port(port, vmnetwork_name,
                                                     policy_profile,
                                                     vsm_ip=vsm_ip)
                except n1kv_exc.VSMError as e:
                    LOG.warning(_LW('Sync Exception: Port creation on VSM '
                                'failed: %s'), e.message)
开发者ID:cisco-openstack,项目名称:networking-cisco,代码行数:31,代码来源:n1kv_sync.py


示例14: update_port_ext

def update_port_ext(port_id, commit=None, hardware_id=None,
                    trunked=None, session=None):
    if not session:
        session = db_api.get_session()

    updated = False

    with session.begin(subtransactions=True):
        port = (session.query(models.PortExt).
                get(port_id))

        if commit is not None:
            port.commit = commit
            updated = True

        if hardware_id is not None:
            port.hardware_id = hardware_id
            updated = True

        if trunked is not None:
            port.trunked = trunked
            updated = True

        if updated:
            session.add(port)
            session.flush()

        return port
开发者ID:kumarom,项目名称:ironic-neutron-plugin,代码行数:28,代码来源:db.py


示例15: filter_port_ext

def filter_port_ext(session=None, **kwargs):
    if not session:
        session = db_api.get_session()

    with session.begin(subtransactions=True):
        return (session.query(models.PortExt).
                filter_by(**kwargs))
开发者ID:kumarom,项目名称:ironic-neutron-plugin,代码行数:7,代码来源:db.py


示例16: get_device_details

 def get_device_details(self, rpc_context, **kwargs):
     """Agent requests device details."""
     agent_id = kwargs.get('agent_id')
     device = kwargs.get('device')
     LOG.debug("Device %(device)s details requested from %(agent_id)s",
               {'device': device, 'agent_id': agent_id})
     plugin = manager.NeutronManager.get_plugin()
     port = plugin.get_port_from_device(device)
     if port:
         binding = db.get_network_binding(db_api.get_session(),
                                          port['network_id'])
         entry = {'device': device,
                  'physical_network': binding.physical_network,
                  'network_type': binding.network_type,
                  'segmentation_id': binding.segmentation_id,
                  'network_id': port['network_id'],
                  'port_mac': port['mac_address'],
                  'port_id': port['id'],
                  'admin_state_up': port['admin_state_up']}
         if cfg.CONF.AGENT.rpc_support_old_agents:
             entry['vlan_id'] = binding.segmentation_id
         new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up']
                       else q_const.PORT_STATUS_DOWN)
         if port['status'] != new_status:
             db.set_port_status(port['id'], new_status)
     else:
         entry = {'device': device}
         LOG.debug("%s can not be found in database", device)
     return entry
开发者ID:gabriel-samfira,项目名称:neutron,代码行数:29,代码来源:rpc_callbacks.py


示例17: setUp

 def setUp(self):
     super(VlanAllocationsTest, self).setUp()
     db.configure_db()
     self.session = db.get_session()
     self.net_p = _create_test_network_profile_if_not_there(self.session)
     n1kv_db_v2.sync_vlan_allocations(self.session, self.net_p)
     self.addCleanup(db.clear_db)
开发者ID:PFZheng,项目名称:neutron,代码行数:7,代码来源:test_n1kv_db.py


示例18: get_port

def get_port(port_id):
    session = db.get_session()
    try:
        port = session.query(models_v2.Port).filter_by(id=port_id).one()
    except exc.NoResultFound:
        port = None
    return port
开发者ID:kobtea,项目名称:neutron,代码行数:7,代码来源:ovs_db_v2.py


示例19: sync_network_states

def sync_network_states(network_vlan_ranges):
    """Synchronize network_states table with current configured VLAN ranges."""

    session = db.get_session()
    with session.begin():
        # get existing allocations for all physical networks
        allocations = dict()
        entries = session.query(mlnx_models_v2.SegmentationIdAllocation).all()
        for entry in entries:
            allocations.setdefault(entry.physical_network, set()).add(entry)

        # process vlan ranges for each configured physical network
        for physical_network, vlan_ranges in network_vlan_ranges.iteritems():
            # determine current configured allocatable vlans for this
            # physical network
            vlan_ids = set()
            for vlan_range in vlan_ranges:
                vlan_ids |= set(moves.xrange(vlan_range[0], vlan_range[1] + 1))

            # remove from table unallocated vlans not currently allocatable
            _remove_non_allocatable_vlans(session, allocations, physical_network, vlan_ids)

            # add missing allocatable vlans to table
            _add_missing_allocatable_vlans(session, physical_network, vlan_ids)

        # remove from table unallocated vlans for any unconfigured physical
        # networks
        _remove_unconfigured_vlans(session, allocations)
开发者ID:armando-migliaccio,项目名称:neutron-1,代码行数:28,代码来源:mlnx_db_v2.py


示例20: add_tunnel_endpoint

def add_tunnel_endpoint(ip, max_retries=10):
    """Return the endpoint of the given IP address or generate a new one."""

    # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
    #                   repeatedly executed transactional block to ensure it
    #                   doesn't conflict with any other concurrently executed
    #                   DB transactions in spite of the specified transactions
    #                   isolation level value
    for i in xrange(max_retries):
        LOG.debug(_("Adding a tunnel endpoint for %s"), ip)
        try:
            session = db.get_session()
            with session.begin(subtransactions=True):
                tunnel = (
                    session.query(ovs_models_v2.TunnelEndpoint).filter_by(ip_address=ip).with_lockmode("update").first()
                )

                if tunnel is None:
                    tunnel_id = _generate_tunnel_id(session)
                    tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
                    session.add(tunnel)

                return tunnel
        except db_exc.DBDuplicateEntry:
            # a concurrent transaction has been commited, try again
            LOG.debug(
                _(
                    "Adding a tunnel endpoint failed due to a concurrent"
                    "transaction had been commited (%s attempts left)"
                ),
                max_retries - (i + 1),
            )

    raise q_exc.NeutronException(message="Unable to generate a new tunnel id")
开发者ID:kobtea,项目名称:neutron,代码行数:34,代码来源:ovs_db_v2.py



注:本文中的neutron.db.api.get_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.register_models函数代码示例发布时间:2022-05-27
下一篇:
Python api.get_engine函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap