• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.true函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.true函数的典型用法代码示例。如果您正苦于以下问题:Python true函数的具体用法?Python true怎么用?Python true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_six_pt_five

    def test_six_pt_five(self):
        x = column("x")
        self.assert_compile(select([x]).where(or_(x == 7, true())),
                "SELECT x WHERE true")

        self.assert_compile(select([x]).where(or_(x == 7, true())),
                "SELECT x WHERE 1 = 1",
                dialect=default.DefaultDialect(supports_native_boolean=False))
开发者ID:chundi,项目名称:sqlalchemy,代码行数:8,代码来源:test_operators.py


示例2: has_property

    def has_property(self, prop):
        property_granted_select = select(
            [null()],
            from_obj=[
                Property.__table__,
                PropertyGroup.__table__,
                Membership.__table__
            ]
        ).where(
            and_(
                Property.name == prop,
                Property.property_group_id == PropertyGroup.id,
                PropertyGroup.id == Membership.group_id,
                Membership.user_id == self.id,
                Membership.active
            )
        )
        #.cte("property_granted_select")
        return and_(
            not_(exists(
                property_granted_select.where(
                    Property.granted == false())

            )),
            exists(
                property_granted_select.where(
                    Property.granted == true()
                )
            )
        )
开发者ID:lukasjuhrich,项目名称:pycroft,代码行数:30,代码来源:user.py


示例3: get_network_ports

def get_network_ports(context, network_id):
    try:
        return (context.session.query(models_v2.Port).
                filter(models_v2.Port.network_id == network_id,
                       models_v2.Port.admin_state_up == sql.true()).all())
    except exc.NoResultFound:
        return
开发者ID:nlewo,项目名称:networking-bgpvpn,代码行数:7,代码来源:bagpipe.py


示例4: get_v3_catalog

    def get_v3_catalog(self, user_id, tenant_id, metadata=None):
        d = dict(six.iteritems(CONF))
        d.update({'tenant_id': tenant_id,
                  'user_id': user_id})

        session = sql.get_session()
        services = (session.query(Service).filter(Service.enabled == true()).
                    options(sql.joinedload(Service.endpoints)).
                    all())

        def make_v3_endpoints(endpoints):
            for endpoint in (ep.to_dict() for ep in endpoints if ep.enabled):
                del endpoint['service_id']
                del endpoint['legacy_endpoint_id']
                del endpoint['enabled']
                endpoint['region'] = endpoint['region_id']
                try:
                    endpoint['url'] = core.format_url(endpoint['url'], d)
                except exception.MalformedEndpoint:
                    continue  # this failure is already logged in format_url()

                yield endpoint

        def make_v3_service(svc):
            eps = list(make_v3_endpoints(svc.endpoints))
            service = {'endpoints': eps, 'id': svc.id, 'type': svc.type}
            service['name'] = svc.extra.get('name', '')
            return service

        return [make_v3_service(svc) for svc in services]
开发者ID:k4veri,项目名称:keystone,代码行数:30,代码来源:sql.py


示例5: get_catalog

    def get_catalog(self, user_id, tenant_id, metadata=None):
        substitutions = dict(six.iteritems(CONF))
        substitutions.update({'tenant_id': tenant_id, 'user_id': user_id})

        session = sql.get_session()
        endpoints = (session.query(Endpoint).
                     options(sql.joinedload(Endpoint.service)).
                     filter(Endpoint.enabled == true()).all())

        catalog = {}

        for endpoint in endpoints:
            if not endpoint.service['enabled']:
                continue
            try:
                url = core.format_url(endpoint['url'], substitutions)
            except exception.MalformedEndpoint:
                continue  # this failure is already logged in format_url()

            region = endpoint['region_id']
            service_type = endpoint.service['type']
            default_service = {
                'id': endpoint['id'],
                'name': endpoint.service.extra.get('name', ''),
                'publicURL': ''
            }
            catalog.setdefault(region, {})
            catalog[region].setdefault(service_type, default_service)
            interface_url = '%sURL' % endpoint['interface']
            catalog[region][service_type][interface_url] = url

        return catalog
开发者ID:k4veri,项目名称:keystone,代码行数:32,代码来源:sql.py


示例6: query_with_hooks

def query_with_hooks(context, model):
    query = context.session.query(model)
    # define basic filter condition for model query
    query_filter = None
    if db_utils.model_query_scope_is_project(context, model):
        if hasattr(model, 'rbac_entries'):
            query = query.outerjoin(model.rbac_entries)
            rbac_model = model.rbac_entries.property.mapper.class_
            query_filter = (
                (model.tenant_id == context.tenant_id) |
                ((rbac_model.action == 'access_as_shared') &
                 ((rbac_model.target_tenant == context.tenant_id) |
                  (rbac_model.target_tenant == '*'))))
        elif hasattr(model, 'shared'):
            query_filter = ((model.tenant_id == context.tenant_id) |
                            (model.shared == sql.true()))
        else:
            query_filter = (model.tenant_id == context.tenant_id)
    # Execute query hooks registered from mixins and plugins
    for hook in get_hooks(model):
        query_hook = helpers.resolve_ref(hook.get('query'))
        if query_hook:
            query = query_hook(context, model, query)

        filter_hook = helpers.resolve_ref(hook.get('filter'))
        if filter_hook:
            query_filter = filter_hook(context, model, query_filter)

    # NOTE(salvatore-orlando): 'if query_filter' will try to evaluate the
    # condition, raising an exception
    if query_filter is not None:
        query = query.filter(query_filter)
    return query
开发者ID:noironetworks,项目名称:neutron,代码行数:33,代码来源:_model_query.py


示例7: has_property

    def has_property(cls, prop, when=None):
        # TODO Use joins
        property_granted_select = select(
            [null()],
            from_obj=[
                Property.__table__,
                PropertyGroup.__table__,
                Membership.__table__
            ]
        ).where(
            and_(
                Property.name == prop,
                Property.property_group_id == PropertyGroup.id,
                PropertyGroup.id == Membership.group_id,
                Membership.user_id == cls.id,
                Membership.active(when)
            )
        )
        #.cte("property_granted_select")
        return and_(
            not_(exists(
                property_granted_select.where(
                    Property.granted == false())

            )),
            exists(
                property_granted_select.where(
                    Property.granted == true()
                )
            )
        ).self_group().label("has_property_" + prop)
开发者ID:agdsn,项目名称:pycroft,代码行数:31,代码来源:user.py


示例8: upgrade

def upgrade():
    op.create_table('trunks',
        sa.Column('admin_state_up', sa.Boolean(),
                  nullable=False, server_default=sql.true()),
        sa.Column('tenant_id', sa.String(length=255), nullable=True,
                  index=True),
        sa.Column('id', sa.String(length=36), nullable=False),
        sa.Column('name', sa.String(length=255), nullable=True),
        sa.Column('port_id', sa.String(length=36), nullable=False),
        sa.Column('status', sa.String(length=16),
                  nullable=False, server_default='ACTIVE'),
        sa.Column('standard_attr_id', sa.BigInteger(), nullable=False),
        sa.ForeignKeyConstraint(['port_id'], ['ports.id'],
                                ondelete='CASCADE'),
        sa.ForeignKeyConstraint(['standard_attr_id'],
                                ['standardattributes.id'],
                                ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('port_id'),
        sa.UniqueConstraint('standard_attr_id')
    )
    op.create_table('subports',
        sa.Column('port_id', sa.String(length=36)),
        sa.Column('trunk_id', sa.String(length=36), nullable=False),
        sa.Column('segmentation_type', sa.String(length=32), nullable=False),
        sa.Column('segmentation_id', sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ondelete='CASCADE'),
        sa.ForeignKeyConstraint(['trunk_id'], ['trunks.id'],
                                ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('port_id'),
        sa.UniqueConstraint('trunk_id', 'segmentation_type', 'segmentation_id',
            name='uniq_subport0trunk_id0segmentation_type0segmentation_id')
    )
开发者ID:cubeek,项目名称:neutron,代码行数:33,代码来源:5abc0278ca73_add_support_for_vlan_trunking.py


示例9: get_v3_catalog

    def get_v3_catalog(self, user_id, tenant_id, metadata=None):
        d = dict(itertools.chain(six.iteritems(CONF), six.iteritems(CONF.eventlet_server)))
        d.update({"tenant_id": tenant_id, "user_id": user_id})

        session = sql.get_session()
        services = (
            session.query(Service).filter(Service.enabled == true()).options(sql.joinedload(Service.endpoints)).all()
        )

        def make_v3_endpoints(endpoints):
            for endpoint in (ep.to_dict() for ep in endpoints if ep.enabled):
                del endpoint["service_id"]
                del endpoint["legacy_endpoint_id"]
                del endpoint["enabled"]
                endpoint["region"] = endpoint["region_id"]
                try:
                    endpoint["url"] = core.format_url(endpoint["url"], d)
                except exception.MalformedEndpoint:
                    continue  # this failure is already logged in format_url()

                yield endpoint

        def make_v3_service(svc):
            eps = list(make_v3_endpoints(svc.endpoints))
            service = {"endpoints": eps, "id": svc.id, "type": svc.type}
            service["name"] = svc.extra.get("name", "")
            return service

        return [make_v3_service(svc) for svc in services]
开发者ID:UTSA-ICS,项目名称:keystone-kerberos,代码行数:29,代码来源:sql.py


示例10: _get_default_external_network

    def _get_default_external_network(self, context):
        """Get the default external network for the deployment."""
        with context.session.begin(subtransactions=True):
            default_external_networks = (
                context.session.query(ext_net_models.ExternalNetwork)
                .filter_by(is_default=sql.true())
                .join(models_v2.Network)
                .join(standard_attr.StandardAttribute)
                .order_by(standard_attr.StandardAttribute.id)
                .all()
            )

        if not default_external_networks:
            LOG.error(
                _LE(
                    "Unable to find default external network "
                    "for deployment, please create/assign one to "
                    "allow auto-allocation to work correctly."
                )
            )
            raise exceptions.AutoAllocationFailure(reason=_("No default router:external network"))
        if len(default_external_networks) > 1:
            LOG.error(
                _LE("Multiple external default networks detected. " "Network %s is true 'default'."),
                default_external_networks[0]["network_id"],
            )
        return default_external_networks[0].network_id
开发者ID:openstack,项目名称:neutron,代码行数:27,代码来源:db.py


示例11: get_ha_routers_l3_agents_count

    def get_ha_routers_l3_agents_count(self, context):
        """Return a map between HA routers and how many agents every
        router is scheduled to.
        """

        # Postgres requires every column in the select to be present in
        # the group by statement when using an aggregate function.
        # One solution is to generate a subquery and join it with the desired
        # columns.
        binding_model = l3_sch_db.RouterL3AgentBinding
        sub_query = (context.session.query(
            binding_model.router_id,
            func.count(binding_model.router_id).label('count')).
            join(l3_attrs_db.RouterExtraAttributes,
                 binding_model.router_id ==
                 l3_attrs_db.RouterExtraAttributes.router_id).
            join(l3_db.Router).
            filter(l3_attrs_db.RouterExtraAttributes.ha == sql.true()).
            group_by(binding_model.router_id).subquery())

        query = (context.session.query(l3_db.Router, sub_query.c.count).
                 join(sub_query))

        return [(self._make_router_dict(router), agent_count)
                for router, agent_count in query]
开发者ID:coreycb,项目名称:neutron,代码行数:25,代码来源:l3_hascheduler_db.py


示例12: test_eleven

 def test_eleven(self):
     c = column('x', Boolean)
     self.assert_compile(
         c.is_(true()),
         "x IS true",
         dialect=self._dialect(True)
     )
开发者ID:chundi,项目名称:sqlalchemy,代码行数:7,代码来源:test_operators.py


示例13: _model_query

    def _model_query(self, context, model):
        query = context.session.query(model)
        # define basic filter condition for model query
        # NOTE(jkoelker) non-admin queries are scoped to their tenant_id
        # NOTE(salvatore-orlando): unless the model allows for shared objects
        query_filter = None
        if not context.is_admin and hasattr(model, 'tenant_id'):
            if hasattr(model, 'shared'):
                query_filter = ((model.tenant_id == context.tenant_id) |
                                (model.shared == sql.true()))
            else:
                query_filter = (model.tenant_id == context.tenant_id)
        # Execute query hooks registered from mixins and plugins
        for _name, hooks in self._model_query_hooks.get(model,
                                                        {}).iteritems():
            query_hook = hooks.get('query')
            if isinstance(query_hook, basestring):
                query_hook = getattr(self, query_hook, None)
            if query_hook:
                query = query_hook(context, model, query)

            filter_hook = hooks.get('filter')
            if isinstance(filter_hook, basestring):
                filter_hook = getattr(self, filter_hook, None)
            if filter_hook:
                query_filter = filter_hook(context, model, query_filter)

        # NOTE(salvatore-orlando): 'if query_filter' will try to evaluate the
        # condition, raising an exception
        if query_filter is not None:
            query = query.filter(query_filter)
        return query
开发者ID:liujyg,项目名称:tacker,代码行数:32,代码来源:db_base.py


示例14: schedule_unscheduled_bgp_speakers

    def schedule_unscheduled_bgp_speakers(self, context, host):
        """Schedule unscheduled BgpSpeaker to a BgpDrAgent.
        """

        LOG.debug('Started auto-scheduling on host %s', host)
        with context.session.begin(subtransactions=True):
            query = context.session.query(agents_db.Agent)
            query = query.filter_by(
                agent_type=bgp_consts.AGENT_TYPE_BGP_ROUTING,
                host=host,
                admin_state_up=sql.true())
            try:
                bgp_dragent = query.one()
            except (exc.NoResultFound):
                LOG.debug('No enabled BgpDrAgent on host %s', host)
                return False

            if agents_db.AgentDbMixin.is_agent_down(
                    bgp_dragent.heartbeat_timestamp):
                LOG.warning(_LW('BgpDrAgent %s is down'), bgp_dragent.id)
                return False

            if self._is_bgp_speaker_hosted(context, bgp_dragent['id']):
                # One BgpDrAgent can only host one BGP speaker
                LOG.debug('BgpDrAgent already hosting a speaker on host %s. '
                          'Cannot schedule an another one', host)
                return False

            unscheduled_speakers = self._get_unscheduled_bgp_speakers(context)
            if not unscheduled_speakers:
                LOG.debug('Nothing to auto-schedule on host %s', host)
                return False

            self.bind(context, [bgp_dragent], unscheduled_speakers[0])
        return True
开发者ID:ruansteve,项目名称:neutron-dynamic-routing,代码行数:35,代码来源:bgp_dragent_scheduler.py


示例15: _model_query

    def _model_query(self, context, model):
        query = context.session.query(model)
        # define basic filter condition for model query
        query_filter = None
        if self.model_query_scope(context, model):
            if hasattr(model, 'shared'):
                query_filter = ((model.tenant_id == context.tenant_id) |
                                (model.shared == sql.true()))
            else:
                query_filter = (model.tenant_id == context.tenant_id)
        # Execute query hooks registered from mixins and plugins
        for _name, hooks in self._model_query_hooks.get(model,
                                                        {}).iteritems():
            query_hook = hooks.get('query')
            if isinstance(query_hook, six.string_types):
                query_hook = getattr(self, query_hook, None)
            if query_hook:
                query = query_hook(context, model, query)

            filter_hook = hooks.get('filter')
            if isinstance(filter_hook, six.string_types):
                filter_hook = getattr(self, filter_hook, None)
            if filter_hook:
                query_filter = filter_hook(context, model, query_filter)

        # NOTE(salvatore-orlando): 'if query_filter' will try to evaluate the
        # condition, raising an exception
        if query_filter is not None:
            query = query.filter(query_filter)
        return query
开发者ID:Intellifora,项目名称:neutron,代码行数:30,代码来源:common_db_mixin.py


示例16: _get_by_disabled_from_db

 def _get_by_disabled_from_db(context, disabled):
     if disabled:
         return context.session.query(api_models.CellMapping).filter_by(
             disabled=true()).order_by(asc(api_models.CellMapping.id)).all()
     else:
         return context.session.query(api_models.CellMapping).filter_by(
             disabled=false()).order_by(asc(
                 api_models.CellMapping.id)).all()
开发者ID:arbrandes,项目名称:nova,代码行数:8,代码来源:cell_mapping.py


示例17: test_true_false

 def test_true_false(self):
     self.assert_compile(
         sql.false(), "0"
     )
     self.assert_compile(
         sql.true(),
         "1"
     )
开发者ID:Callek,项目名称:sqlalchemy,代码行数:8,代码来源:test_sqlite.py


示例18: get_v3_catalog

    def get_v3_catalog(self, user_id, tenant_id):
        """Retrieve and format the current V3 service catalog.

        :param user_id: The id of the user who has been authenticated for
            creating service catalog.
        :param tenant_id: The id of the project. 'tenant_id' will be None in
            the case this being called to create a catalog to go in a domain
            scoped token. In this case, any endpoint that requires a
            tenant_id as part of their URL will be skipped.

        :returns: A list representing the service catalog or an empty list

        """
        d = dict(
            itertools.chain(CONF.items(), CONF.eventlet_server.items()))
        d.update({'user_id': user_id})
        silent_keyerror_failures = []
        if tenant_id:
            d.update({
                'tenant_id': tenant_id,
                'project_id': tenant_id,
            })
        else:
            silent_keyerror_failures = ['tenant_id', 'project_id', ]

        session = sql.get_session()
        services = (session.query(Service).filter(Service.enabled == true()).
                    options(sql.joinedload(Service.endpoints)).
                    all())

        def make_v3_endpoints(endpoints):
            for endpoint in (ep.to_dict() for ep in endpoints if ep.enabled):
                del endpoint['service_id']
                del endpoint['legacy_endpoint_id']
                del endpoint['enabled']
                endpoint['region'] = endpoint['region_id']
                try:
                    formatted_url = core.format_url(
                        endpoint['url'], d,
                        silent_keyerror_failures=silent_keyerror_failures)
                    if formatted_url:
                        endpoint['url'] = formatted_url
                    else:
                        continue
                except exception.MalformedEndpoint:
                    continue  # this failure is already logged in format_url()

                yield endpoint

        # TODO(davechen): If there is service with no endpoints, we should skip
        # the service instead of keeping it in the catalog, see bug #1436704.
        def make_v3_service(svc):
            eps = list(make_v3_endpoints(svc.endpoints))
            service = {'endpoints': eps, 'id': svc.id, 'type': svc.type}
            service['name'] = svc.extra.get('name', '')
            return service

        return [make_v3_service(svc) for svc in services]
开发者ID:dims,项目名称:keystone,代码行数:58,代码来源:sql.py


示例19: get_catalog

    def get_catalog(self, user_id, tenant_id):
        """Retrieve and format the V2 service catalog.

        :param user_id: The id of the user who has been authenticated for
            creating service catalog.
        :param tenant_id: The id of the project. 'tenant_id' will be None
            in the case this being called to create a catalog to go in a
            domain scoped token. In this case, any endpoint that requires
            a tenant_id as part of their URL will be skipped (as would a whole
            service if, as a consequence, it has no valid endpoints).

        :returns: A nested dict representing the service catalog or an
                  empty dict.

        """
        substitutions = dict(
            itertools.chain(CONF.items(), CONF.eventlet_server.items()))
        substitutions.update({'user_id': user_id})
        silent_keyerror_failures = []
        if tenant_id:
            substitutions.update({'tenant_id': tenant_id})
        else:
            silent_keyerror_failures = ['tenant_id']

        session = sql.get_session()
        endpoints = (session.query(Endpoint).
                     options(sql.joinedload(Endpoint.service)).
                     filter(Endpoint.enabled == true()).all())

        catalog = {}

        for endpoint in endpoints:
            if not endpoint.service['enabled']:
                continue
            try:
                formatted_url = core.format_url(
                    endpoint['url'], substitutions,
                    silent_keyerror_failures=silent_keyerror_failures)
                if formatted_url is not None:
                    url = formatted_url
                else:
                    continue
            except exception.MalformedEndpoint:
                continue  # this failure is already logged in format_url()

            region = endpoint['region_id']
            service_type = endpoint.service['type']
            default_service = {
                'id': endpoint['id'],
                'name': endpoint.service.extra.get('name', ''),
                'publicURL': ''
            }
            catalog.setdefault(region, {})
            catalog[region].setdefault(service_type, default_service)
            interface_url = '%sURL' % endpoint['interface']
            catalog[region][service_type][interface_url] = url

        return catalog
开发者ID:eros-lige,项目名称:UPCloud-keystone,代码行数:58,代码来源:sql.py


示例20: remove_expired_reservations

def remove_expired_reservations(context, tenant_id=None):
    now = utcnow()
    resv_query = context.session.query(quota_models.Reservation)
    if tenant_id:
        tenant_expr = (quota_models.Reservation.tenant_id == tenant_id)
    else:
        tenant_expr = sql.true()
    resv_query = resv_query.filter(sa.and_(
        tenant_expr, quota_models.Reservation.expiration < now))
    return resv_query.delete()
开发者ID:smokony,项目名称:neutron,代码行数:10,代码来源:api.py



注:本文中的sqlalchemy.sql.true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.union_all函数代码示例发布时间:2022-05-27
下一篇:
Python sql.text函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap