• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.null函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.null函数的典型用法代码示例。如果您正苦于以下问题:Python null函数的具体用法?Python null怎么用?Python null使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了null函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: delete_userless_histories

def delete_userless_histories(app, cutoff_time, info_only=False, force_retry=False):
    # Deletes userless histories whose update_time value is older than the cutoff_time.
    # The purge history script will handle marking DatasetInstances as deleted.
    # Nothing is removed from disk yet.
    history_count = 0
    start = time.time()
    if force_retry:
        histories = app.sa_session.query(app.model.History) \
                                  .filter(and_(app.model.History.table.c.user_id == null(),
                                               app.model.History.table.c.update_time < cutoff_time))
    else:
        histories = app.sa_session.query(app.model.History) \
                                  .filter(and_(app.model.History.table.c.user_id == null(),
                                               app.model.History.table.c.deleted == false(),
                                               app.model.History.table.c.update_time < cutoff_time))
    for history in histories:
        if not info_only:
            print("Deleting history id ", history.id)
            history.deleted = True
            app.sa_session.add(history)
            app.sa_session.flush()
        history_count += 1
    stop = time.time()
    print("Deleted %d histories" % history_count)
    print("Elapsed time: ", stop - start)
    print("##########################################")
开发者ID:osallou,项目名称:galaxy,代码行数:26,代码来源:cleanup_datasets.py


示例2: glottologmeta

def glottologmeta(request):
    q = DBSession.query(Languoid)\
        .filter(Language.active == true())\
        .filter(Languoid.status.in_(
            (LanguoidStatus.established, LanguoidStatus.unattested)))
    qt = q.filter(Languoid.father_pk == null())
    res = {
        'last_update': DBSession.query(Language.updated)
        .order_by(Language.updated.desc()).first()[0],
        'number_of_families': qt.filter(Languoid.level == LanguoidLevel.family).count(),
        'number_of_isolates': qt.filter(Languoid.level == LanguoidLevel.language).count(),
    }
    ql = q.filter(Languoid.hid != null())
    res['number_of_languages'] = {'all': ql.count()}
    res['special_families'] = OrderedDict()
    for name in SPECIAL_FAMILIES:
        l = qt.filter(Language.name == name).one()
        res['special_families'][name] = l
        res['number_of_languages'][name] = l.child_language_count

    res['number_of_languages']['l1'] = res['number_of_languages']['all'] \
        - res['number_of_languages']['Pidgin']\
        - res['number_of_languages']['Artificial Language']\
        - res['number_of_languages']['Sign Language']
    return res
开发者ID:pombredanne,项目名称:glottolog3,代码行数:25,代码来源:views.py


示例3: base_query

    def base_query(self, query):
        query = query.options(joinedload(Species.categories), joinedload(Parameter._files))
        if self.languages:
            for i, lang in enumerate(self.languages):
                query = query.outerjoin(
                    self._langs[i],
                    and_(self._langs[i].language_pk == lang.pk,
                         self._langs[i].parameter_pk == Parameter.pk))

            query = query \
                .filter(or_(*[
                    self._langs[i].pk != null() for i in range(len(self._langs))]))\
                .outerjoin(SpeciesCategory, SpeciesCategory.species_pk == Parameter.pk) \
                .outerjoin(
                    Category,
                    and_(
                        SpeciesCategory.category_pk == Category.pk,
                        Category.language_pk.in_([l.pk for l in self.languages])))\
                .options(joinedload_all(Parameter.valuesets, ValueSet.values))
        else:
            query = query\
                .outerjoin(SpeciesCategory, SpeciesCategory.species_pk == Parameter.pk) \
                .outerjoin(Category,
                           and_(SpeciesCategory.category_pk == Category.pk,
                                Category.language_pk == null()))
        return query.distinct()
开发者ID:kristinnts11,项目名称:tsammalex,代码行数:26,代码来源:datatables.py


示例4: group

    def group(self, group, current=None):
        # Filter by group
        sub_query = db.session.query(GroupPatient)
        sub_query = sub_query.filter(GroupPatient.group == group, GroupPatient.patient_id == Patient.id)

        if current is not None:
            if current:
                # Between from and to date
                sub_query = sub_query.filter(
                    GroupPatient.from_date <= func.now(),
                    or_(
                        GroupPatient.to_date == null(),
                        GroupPatient.to_date >= func.now(),
                    )
                )
            else:
                # Outside from or to date
                sub_query = sub_query.filter(or_(
                    GroupPatient.from_date > func.now(),
                    and_(
                        GroupPatient.to_date != null(),
                        GroupPatient.to_date < func.now()
                    )
                ))

        self.query = self.query.filter(sub_query.exists())

        return self
开发者ID:renalreg,项目名称:radar,代码行数:28,代码来源:patient_search.py


示例5: test_isnull_notnull

    def test_isnull_notnull(self):
        sat = self.sa_alltypes
        sd = sat.c.double_col
        d = self.alltypes.double_col

        cases = [(d.isnull(), sd.is_(sa.null())), (d.notnull(), sd.isnot(sa.null()))]
        self._check_expr_cases(cases)
开发者ID:thekingofhero,项目名称:ibis,代码行数:7,代码来源:test_sqlalchemy.py


示例6: build_partial_day_ips

def build_partial_day_ips(query, period_start, period_end):
    """Method to build an IP list for the case 2

    when the IP was allocated after the period start and
    is still allocated after the period end.
    This method only looks at public IPv4 addresses.
    """
    # Filter out only IPv4 that were allocated after the period start
    # and have not been deallocated before the period end.
    # allocated_at will be set to a date
    ip_list = (
        query.filter(models.IPAddress.version == 4L)
        .filter(models.IPAddress.network_id == PUBLIC_NETWORK_ID)
        .filter(models.IPAddress.used_by_tenant_id is not None)
        .filter(
            and_(
                models.IPAddress.allocated_at != null(),
                models.IPAddress.allocated_at >= period_start,
                models.IPAddress.allocated_at < period_end,
            )
        )
        .filter(
            or_(
                models.IPAddress._deallocated is False,
                models.IPAddress.deallocated_at == null(),
                models.IPAddress.deallocated_at >= period_end,
            )
        )
        .all()
    )

    return ip_list
开发者ID:roaet,项目名称:quark,代码行数:32,代码来源:billing.py


示例7: test_draft_creation

    def test_draft_creation(self):
        model.Recipe(
            type='MASH',
            name='Rocky Mountain River IPA',
            gallons=5,
            boil_minutes=60,
            notes=u'This is my favorite recipe.',
            state=u'PUBLISHED'
        )
        model.commit()

        model.Recipe.query.first().draft()
        model.commit()

        assert model.Recipe.query.count() == 2
        source = model.Recipe.query.filter(
            model.Recipe.published_version == null()
        ).first()  # noqa
        draft = model.Recipe.query.filter(
            model.Recipe.published_version != null()
        ).first()  # noqa

        assert source != draft
        assert source.type == draft.type == 'MASH'
        assert source.name == draft.name == 'Rocky Mountain River IPA'
        assert source.state != draft.state
        assert draft.state == 'DRAFT'

        assert draft.published_version == source
        assert source.current_draft == draft
开发者ID:ryanpetrello,项目名称:draughtcraft,代码行数:30,代码来源:test_recipes.py


示例8: upgrade

def upgrade():
    op.alter_column(
        'writeup_post_versions', 'threadpost_id',
        nullable=False)
    op.alter_column(
        'writeup_post_versions', 'writeuppost_id',
        nullable=True)
    op.alter_column(
        'writeup_post_versions', 'html',
        nullable=True)
    op.create_check_constraint(
        'writeup_post_versions_check_html', 'writeup_post_versions',
        sa.and_(sa.column('writeuppost_id') != sa.null(), sa.column('html') != sa.null()))
开发者ID:inklesspen,项目名称:mimir,代码行数:13,代码来源:2b489140be5_wpv_nulls.py


示例9: comments_for_video

    def comments_for_video(cls, video_id, comment_ids=None):
        """Return comments for a video with commenter name & email."""
        video_comments = cls.query.filter_by(video_id=video_id)
        if comment_ids:
            video_comments = video_comments.filter(cls.id.in_(comment_ids))

        account_comments = video_comments.join(
            AccountUser,
            (VideoComment.user_type == 'account_user') &
            (VideoComment.user_id == AccountUser.id)
        ).with_entities(
            VideoComment,
            AccountUser.display_name.label('name'),
            AccountUser.username.label('email'),
            AccountUser.avatar_filename,
            AccountUser.account_id,
        )

        collab_comments_with_user = video_comments.join(
            VideoCollaborator,
            (VideoComment.user_type == 'collaborator') &
            (VideoComment.user_id == VideoCollaborator.id)
        ).join(
            AccountUser,
            AccountUser.id == VideoCollaborator.account_user_id
        ).with_entities(
            VideoComment,
            AccountUser.display_name.label('name'),
            AccountUser.username.label('email'),
            AccountUser.avatar_filename,
            AccountUser.account_id,
        )

        collab_comments = video_comments.join(
            VideoCollaborator,
            (VideoComment.user_type == 'collaborator') &
            (VideoComment.user_id == VideoCollaborator.id) &
            (VideoCollaborator.account_user_id.is_(None))
        ).with_entities(
            VideoComment,
            VideoCollaborator.name,
            VideoCollaborator.email,
            null().label('avatar_filename'),
            null().label('account_id'),
        )

        return account_comments.union_all(
            collab_comments_with_user, collab_comments).order_by(VideoComment.date_added)
开发者ID:philpill,项目名称:romeo,代码行数:48,代码来源:models.py


示例10: _get_choices

def _get_choices(db_session, schema_name, attribute_name):
    """
    Returns the most recent version of every choice ever used for the attribute
    """

    recent_choices_query = (
        db_session.query(datastore.Choice)
        .join(datastore.Choice.attribute)
        .join(datastore.Attribute.schema)
        .add_column(
            sa.func.row_number().over(
                partition_by=datastore.Choice.name,
                order_by=datastore.Schema.publish_date.desc().nullslast()
            ).label('row_number')
        )
        .filter(
            (datastore.Schema.name == schema_name) &
            (datastore.Attribute.name == attribute_name) &
            (datastore.Schema.publish_date != sa.null())
        )
        .subquery()
    )

    query = (
        db_session.query(datastore.Choice)
        .select_entity_from(recent_choices_query)
        .filter(recent_choices_query.c.row_number == 1)
    )

    choices = query.all()

    return choices
开发者ID:jkrooskos,项目名称:occams_imports,代码行数:32,代码来源:direct.py


示例11: upgrade

def upgrade():
    """Remove question and answer for startDate from briefs.data for draft dos2 briefs."""
    conn = op.get_bind()

    # SELECT id, data
    # FROM briefs JOIN frameworks ON briefs.framework_id = frameworks.id
    # WHERE frameworks.slug = 'digital-outcomes-and-specialists-2' AND briefs.published_at IS null;
    query = briefs_table.join(
        frameworks_table,
        briefs_table.c.framework_id == frameworks_table.c.id
    ).select(
        sa.and_(
            frameworks_table.c.slug == 'digital-outcomes-and-specialists-2',
            briefs_table.c.published_at == sa.null()
        )
    ).with_only_columns(
        (
            briefs_table.c.id,
            briefs_table.c.data
        )
    )
    results = conn.execute(query).fetchall()

    for brief_id, brief_data in results:
        if brief_data.pop('startDate', None) is not None:
            # UPDATE briefs SET data = _brief_data WHERE id = _brief_id;
            query = briefs_table.update().where(briefs_table.c.id==brief_id).values(data=brief_data)
            conn.execute(query)
开发者ID:alphagov,项目名称:digitalmarketplace-api,代码行数:28,代码来源:870_remove_invalid_draft_dos2_brief_dates.py


示例12: get_scheduled_jobs_to_start

def get_scheduled_jobs_to_start(time, batch_size=None, session=None):
    query = b.model_query(models.ScheduledJob)

    execute_at_col = models.ScheduledJob.execute_at
    captured_at_col = models.ScheduledJob.captured_at

    # Filter by execution time accounting for a configured job pickup interval.
    query = query.filter(
        execute_at_col <
        time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after)
    )

    # Filter by captured time accounting for a configured captured job timeout.
    min_captured_at = (
        utils.utc_now_sec() -
        datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout)
    )

    query = query.filter(
        sa.or_(
            captured_at_col == sa.null(),
            captured_at_col <= min_captured_at
        )
    )

    query = query.order_by(execute_at_col)
    query = query.limit(batch_size)

    return query.all()
开发者ID:openstack,项目名称:mistral,代码行数:29,代码来源:api.py


示例13: upgrade

def upgrade(migrate_engine):
    meta = sa.MetaData(bind=migrate_engine)
    trait = sa.Table('trait', meta, autoload=True)
    event = sa.Table('event', meta, autoload=True)
    trait_type = sa.Table('trait_type', meta, autoload=True)
    for t_name, t_type, t_nullable, col_name, __ in tables:
        t_table = sa.Table(
            t_name, meta,
            sa.Column('event_id', sa.Integer,
                      sa.ForeignKey(event.c.id), primary_key=True),
            sa.Column('key', sa.String(255), primary_key=True),
            sa.Column('value', t_type, nullable=t_nullable),
            sa.Index('ix_%s_event_id_key' % t_name,
                     'event_id', 'key'),
            mysql_engine='InnoDB',
            mysql_charset='utf8',
        )
        t_table.create()
        query = sa.select(
            [trait.c.event_id,
             trait_type.c.desc,
             trait.c[col_name]]).select_from(
                 trait.join(trait_type,
                            trait.c.trait_type_id == trait_type.c.id)).where(
                                trait.c[col_name] != sa.null())
        if query.alias().select().scalar() is not None:
            t_table.insert().from_select(
                ['event_id', 'key', 'value'], query).execute()
    trait.drop()
    trait_type.drop()
开发者ID:AVerma87,项目名称:ceilometer,代码行数:30,代码来源:041_expand_event_traits.py


示例14: has_property

    def has_property(self, prop):
        property_granted_select = select(
            [null()],
            from_obj=[
                Property.__table__,
                PropertyGroup.__table__,
                Membership.__table__
            ]
        ).where(
            and_(
                Property.name == prop,
                Property.property_group_id == PropertyGroup.id,
                PropertyGroup.id == Membership.group_id,
                Membership.user_id == self.id,
                Membership.active
            )
        )
        #.cte("property_granted_select")
        return and_(
            not_(exists(
                property_granted_select.where(
                    Property.granted == false())

            )),
            exists(
                property_granted_select.where(
                    Property.granted == true()
                )
            )
        )
开发者ID:lukasjuhrich,项目名称:pycroft,代码行数:30,代码来源:user.py


示例15: where

 def where(self,clauses):
     '''
     sql條件語句
     每一個語句包括字段名稱、操作符、值。
     clause:一個sql條件表達語句,格式為:["and"/"or",[(字段名称,操作符,值),...]]
     '''
     if not clauses:
         return None
     if clauses[0] in _sql_rl:
         rl=_sql_rl[clauses[0]]
     else:
         raise Exception("sql where clause list  must start with 'and' or 'or'")
     wl=[]
     for c in clauses[1]:
         c=list(c)
         if c[0] in _sql_rl:
             wl.append(self.where(c))
         elif isinstance(c,_BinaryExpression):
             wl.append(c)
         elif len(c)==3 and c[1] in _exp_where_op:
             if c[2]=='null':
                 c[2]=null()
             wl.append(self.clause(c))
         else:
             raise Exception("sql clause error,%s" % (clauses))
     return rl(*wl)
开发者ID:mu2019,项目名称:heysqlware,代码行数:26,代码来源:heysqlware.py


示例16: has_property

    def has_property(cls, prop, when=None):
        # TODO Use joins
        property_granted_select = select(
            [null()],
            from_obj=[
                Property.__table__,
                PropertyGroup.__table__,
                Membership.__table__
            ]
        ).where(
            and_(
                Property.name == prop,
                Property.property_group_id == PropertyGroup.id,
                PropertyGroup.id == Membership.group_id,
                Membership.user_id == cls.id,
                Membership.active(when)
            )
        )
        #.cte("property_granted_select")
        return and_(
            not_(exists(
                property_granted_select.where(
                    Property.granted == false())

            )),
            exists(
                property_granted_select.where(
                    Property.granted == true()
                )
            )
        ).self_group().label("has_property_" + prop)
开发者ID:agdsn,项目名称:pycroft,代码行数:31,代码来源:user.py


示例17: _send_password_resets

    def _send_password_resets(self):
        db = request.environ['sqlalchemy.session']
        model = request.environ['sqlalchemy.model']

        resets = db.query(model.PasswordResetRequest).filter_by(sent_on=null()).all()
        from email.MIMEText import MIMEText
        from fivecents.lib.mail import EmailSender

        for reset in resets:
            try:
                c.password_reset_link = h.site_url() + h.url_for(controller='password', action='reset', token=reset.token)
                c.password_reset_code = reset.token
                body = render_jinja('messages/password_reset.jinja')
                message = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
                message['Subject'] = _("Password reset instruction from %s") % h.site_name()
                message['To'] = reset.user.email
                message['From'] = config['from_address']
                
                log.info(_("Sending password reset e-mail to %s") % message['To'])
                ms = EmailSender(to_addresses = message['To'])
                ms.send_mime(message)

                reset.sent_on = datetime.utcnow()
                db.commit()
            except:
                log.error("Can't send password reset e-mail", exc_info=1)
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:26,代码来源:cron.py


示例18: _send_password_resets

    def _send_password_resets(self):
        db = request.environ['sqlalchemy.session']
        model = request.environ['sqlalchemy.model']
        output = StringIO()

        resets = db.query(model.PasswordResetRequest).filter_by(sent_on=null()).all()

        for reset in resets:
            try:
                c.password_reset_link = h.site_url() + h.url_for(controller='password', action='reset', token=reset.token)
                c.password_reset_code = reset.token
                body = render_jinja('messages/password_reset.jinja')
                message = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
                message['Subject'] = _("Password reset instruction from %s") % h.site_name()
                message['To'] = reset.user.email
                message['From'] = config['from_address']
                
                ms = EmailSender(to_addresses = message['To'])
                ms.send_mime(message)

                reset.sent_on = datetime.utcnow()
                db.commit()

                output.write(_("Sending password reset e-mail to %s\n") % message['To'])
            except:
                output.write(_("Can't send password reset e-mail: %s\n") % format_exc())
        return output.getvalue()
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:27,代码来源:cron.py


示例19: migrate_claims

def migrate_claims(migrate_engine, metadata, buildrequests, objects,
                   buildrequest_claims):

    # First, ensure there is an object row for each master
    null_id = sa.null().label('id')
    if migrate_engine.dialect.name == 'postgresql':
        # postgres needs NULL cast to an integer:
        null_id = sa.cast(null_id, sa.INTEGER)
    new_objects = sa.select([
        null_id,
        buildrequests.c.claimed_by_name.label("name"),
        sa.literal_column("'BuildMaster'").label("class_name"),
    ],
        whereclause=buildrequests.c.claimed_by_name != NULL,
        distinct=True)

    # this doesn't seem to work without str() -- verified in sqla 0.6.0 - 0.7.1
    migrate_engine.execute(
        str(sautils.InsertFromSelect(objects, new_objects)))

    # now make a buildrequest_claims row for each claimed build request
    join = buildrequests.join(objects,
                              (buildrequests.c.claimed_by_name == objects.c.name)
                              # (have to use sa.text because str, below, doesn't work
                              # with placeholders)
                              & (objects.c.class_name == sa.text("'BuildMaster'")))
    claims = sa.select([
        buildrequests.c.id.label('brid'),
        objects.c.id.label('objectid'),
        buildrequests.c.claimed_at,
    ], from_obj=[join],
        whereclause=buildrequests.c.claimed_by_name != NULL)
    migrate_engine.execute(
        str(sautils.InsertFromSelect(buildrequest_claims, claims)))
开发者ID:ABI-Software,项目名称:buildbot,代码行数:34,代码来源:011_add_buildrequest_claims.py


示例20: migrate_claims

def migrate_claims(migrate_engine, metadata, buildrequests, objects, buildrequest_claims):

    # First, ensure there is an object row for each master
    new_objects = sa.select(
        [
            sa.null().label("id"),
            buildrequests.c.claimed_by_name.label("name"),
            sa.literal_column("'BuildMaster'").label("class_name"),
        ],
        whereclause=buildrequests.c.claimed_by_name != None,
        distinct=True,
    )

    # this doesn't seem to work without str() -- verified in sqla 0.6.0 - 0.7.1
    migrate_engine.execute(str(sautils.InsertFromSelect(objects, new_objects)))

    # now make a buildrequest_claims row for each claimed build request
    join = buildrequests.join(
        objects,
        (buildrequests.c.claimed_by_name == objects.c.name)
        # (have to use sa.text because str, below, doesn't work
        # with placeholders)
        & (objects.c.class_name == sa.text("'BuildMaster'")),
    )
    claims = sa.select(
        [buildrequests.c.id.label("brid"), objects.c.id.label("objectid"), buildrequests.c.claimed_at],
        from_obj=[join],
        whereclause=buildrequests.c.claimed_by_name != None,
    )
    migrate_engine.execute(str(sautils.InsertFromSelect(buildrequest_claims, claims)))
开发者ID:nlmills,项目名称:buildbot,代码行数:30,代码来源:011_add_buildrequest_claims.py



注:本文中的sqlalchemy.null函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.or_函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.literal_column函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap