本文整理汇总了Python中sqlalchemy.null函数的典型用法代码示例。如果您正苦于以下问题:Python null函数的具体用法?Python null怎么用?Python null使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了null函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: delete_userless_histories
def delete_userless_histories(app, cutoff_time, info_only=False, force_retry=False):
# Deletes userless histories whose update_time value is older than the cutoff_time.
# The purge history script will handle marking DatasetInstances as deleted.
# Nothing is removed from disk yet.
history_count = 0
start = time.time()
if force_retry:
histories = app.sa_session.query(app.model.History) \
.filter(and_(app.model.History.table.c.user_id == null(),
app.model.History.table.c.update_time < cutoff_time))
else:
histories = app.sa_session.query(app.model.History) \
.filter(and_(app.model.History.table.c.user_id == null(),
app.model.History.table.c.deleted == false(),
app.model.History.table.c.update_time < cutoff_time))
for history in histories:
if not info_only:
print("Deleting history id ", history.id)
history.deleted = True
app.sa_session.add(history)
app.sa_session.flush()
history_count += 1
stop = time.time()
print("Deleted %d histories" % history_count)
print("Elapsed time: ", stop - start)
print("##########################################")
开发者ID:osallou,项目名称:galaxy,代码行数:26,代码来源:cleanup_datasets.py
示例2: glottologmeta
def glottologmeta(request):
q = DBSession.query(Languoid)\
.filter(Language.active == true())\
.filter(Languoid.status.in_(
(LanguoidStatus.established, LanguoidStatus.unattested)))
qt = q.filter(Languoid.father_pk == null())
res = {
'last_update': DBSession.query(Language.updated)
.order_by(Language.updated.desc()).first()[0],
'number_of_families': qt.filter(Languoid.level == LanguoidLevel.family).count(),
'number_of_isolates': qt.filter(Languoid.level == LanguoidLevel.language).count(),
}
ql = q.filter(Languoid.hid != null())
res['number_of_languages'] = {'all': ql.count()}
res['special_families'] = OrderedDict()
for name in SPECIAL_FAMILIES:
l = qt.filter(Language.name == name).one()
res['special_families'][name] = l
res['number_of_languages'][name] = l.child_language_count
res['number_of_languages']['l1'] = res['number_of_languages']['all'] \
- res['number_of_languages']['Pidgin']\
- res['number_of_languages']['Artificial Language']\
- res['number_of_languages']['Sign Language']
return res
开发者ID:pombredanne,项目名称:glottolog3,代码行数:25,代码来源:views.py
示例3: base_query
def base_query(self, query):
query = query.options(joinedload(Species.categories), joinedload(Parameter._files))
if self.languages:
for i, lang in enumerate(self.languages):
query = query.outerjoin(
self._langs[i],
and_(self._langs[i].language_pk == lang.pk,
self._langs[i].parameter_pk == Parameter.pk))
query = query \
.filter(or_(*[
self._langs[i].pk != null() for i in range(len(self._langs))]))\
.outerjoin(SpeciesCategory, SpeciesCategory.species_pk == Parameter.pk) \
.outerjoin(
Category,
and_(
SpeciesCategory.category_pk == Category.pk,
Category.language_pk.in_([l.pk for l in self.languages])))\
.options(joinedload_all(Parameter.valuesets, ValueSet.values))
else:
query = query\
.outerjoin(SpeciesCategory, SpeciesCategory.species_pk == Parameter.pk) \
.outerjoin(Category,
and_(SpeciesCategory.category_pk == Category.pk,
Category.language_pk == null()))
return query.distinct()
开发者ID:kristinnts11,项目名称:tsammalex,代码行数:26,代码来源:datatables.py
示例4: group
def group(self, group, current=None):
# Filter by group
sub_query = db.session.query(GroupPatient)
sub_query = sub_query.filter(GroupPatient.group == group, GroupPatient.patient_id == Patient.id)
if current is not None:
if current:
# Between from and to date
sub_query = sub_query.filter(
GroupPatient.from_date <= func.now(),
or_(
GroupPatient.to_date == null(),
GroupPatient.to_date >= func.now(),
)
)
else:
# Outside from or to date
sub_query = sub_query.filter(or_(
GroupPatient.from_date > func.now(),
and_(
GroupPatient.to_date != null(),
GroupPatient.to_date < func.now()
)
))
self.query = self.query.filter(sub_query.exists())
return self
开发者ID:renalreg,项目名称:radar,代码行数:28,代码来源:patient_search.py
示例5: test_isnull_notnull
def test_isnull_notnull(self):
sat = self.sa_alltypes
sd = sat.c.double_col
d = self.alltypes.double_col
cases = [(d.isnull(), sd.is_(sa.null())), (d.notnull(), sd.isnot(sa.null()))]
self._check_expr_cases(cases)
开发者ID:thekingofhero,项目名称:ibis,代码行数:7,代码来源:test_sqlalchemy.py
示例6: build_partial_day_ips
def build_partial_day_ips(query, period_start, period_end):
"""Method to build an IP list for the case 2
when the IP was allocated after the period start and
is still allocated after the period end.
This method only looks at public IPv4 addresses.
"""
# Filter out only IPv4 that were allocated after the period start
# and have not been deallocated before the period end.
# allocated_at will be set to a date
ip_list = (
query.filter(models.IPAddress.version == 4L)
.filter(models.IPAddress.network_id == PUBLIC_NETWORK_ID)
.filter(models.IPAddress.used_by_tenant_id is not None)
.filter(
and_(
models.IPAddress.allocated_at != null(),
models.IPAddress.allocated_at >= period_start,
models.IPAddress.allocated_at < period_end,
)
)
.filter(
or_(
models.IPAddress._deallocated is False,
models.IPAddress.deallocated_at == null(),
models.IPAddress.deallocated_at >= period_end,
)
)
.all()
)
return ip_list
开发者ID:roaet,项目名称:quark,代码行数:32,代码来源:billing.py
示例7: test_draft_creation
def test_draft_creation(self):
model.Recipe(
type='MASH',
name='Rocky Mountain River IPA',
gallons=5,
boil_minutes=60,
notes=u'This is my favorite recipe.',
state=u'PUBLISHED'
)
model.commit()
model.Recipe.query.first().draft()
model.commit()
assert model.Recipe.query.count() == 2
source = model.Recipe.query.filter(
model.Recipe.published_version == null()
).first() # noqa
draft = model.Recipe.query.filter(
model.Recipe.published_version != null()
).first() # noqa
assert source != draft
assert source.type == draft.type == 'MASH'
assert source.name == draft.name == 'Rocky Mountain River IPA'
assert source.state != draft.state
assert draft.state == 'DRAFT'
assert draft.published_version == source
assert source.current_draft == draft
开发者ID:ryanpetrello,项目名称:draughtcraft,代码行数:30,代码来源:test_recipes.py
示例8: upgrade
def upgrade():
op.alter_column(
'writeup_post_versions', 'threadpost_id',
nullable=False)
op.alter_column(
'writeup_post_versions', 'writeuppost_id',
nullable=True)
op.alter_column(
'writeup_post_versions', 'html',
nullable=True)
op.create_check_constraint(
'writeup_post_versions_check_html', 'writeup_post_versions',
sa.and_(sa.column('writeuppost_id') != sa.null(), sa.column('html') != sa.null()))
开发者ID:inklesspen,项目名称:mimir,代码行数:13,代码来源:2b489140be5_wpv_nulls.py
示例9: comments_for_video
def comments_for_video(cls, video_id, comment_ids=None):
"""Return comments for a video with commenter name & email."""
video_comments = cls.query.filter_by(video_id=video_id)
if comment_ids:
video_comments = video_comments.filter(cls.id.in_(comment_ids))
account_comments = video_comments.join(
AccountUser,
(VideoComment.user_type == 'account_user') &
(VideoComment.user_id == AccountUser.id)
).with_entities(
VideoComment,
AccountUser.display_name.label('name'),
AccountUser.username.label('email'),
AccountUser.avatar_filename,
AccountUser.account_id,
)
collab_comments_with_user = video_comments.join(
VideoCollaborator,
(VideoComment.user_type == 'collaborator') &
(VideoComment.user_id == VideoCollaborator.id)
).join(
AccountUser,
AccountUser.id == VideoCollaborator.account_user_id
).with_entities(
VideoComment,
AccountUser.display_name.label('name'),
AccountUser.username.label('email'),
AccountUser.avatar_filename,
AccountUser.account_id,
)
collab_comments = video_comments.join(
VideoCollaborator,
(VideoComment.user_type == 'collaborator') &
(VideoComment.user_id == VideoCollaborator.id) &
(VideoCollaborator.account_user_id.is_(None))
).with_entities(
VideoComment,
VideoCollaborator.name,
VideoCollaborator.email,
null().label('avatar_filename'),
null().label('account_id'),
)
return account_comments.union_all(
collab_comments_with_user, collab_comments).order_by(VideoComment.date_added)
开发者ID:philpill,项目名称:romeo,代码行数:48,代码来源:models.py
示例10: _get_choices
def _get_choices(db_session, schema_name, attribute_name):
"""
Returns the most recent version of every choice ever used for the attribute
"""
recent_choices_query = (
db_session.query(datastore.Choice)
.join(datastore.Choice.attribute)
.join(datastore.Attribute.schema)
.add_column(
sa.func.row_number().over(
partition_by=datastore.Choice.name,
order_by=datastore.Schema.publish_date.desc().nullslast()
).label('row_number')
)
.filter(
(datastore.Schema.name == schema_name) &
(datastore.Attribute.name == attribute_name) &
(datastore.Schema.publish_date != sa.null())
)
.subquery()
)
query = (
db_session.query(datastore.Choice)
.select_entity_from(recent_choices_query)
.filter(recent_choices_query.c.row_number == 1)
)
choices = query.all()
return choices
开发者ID:jkrooskos,项目名称:occams_imports,代码行数:32,代码来源:direct.py
示例11: upgrade
def upgrade():
"""Remove question and answer for startDate from briefs.data for draft dos2 briefs."""
conn = op.get_bind()
# SELECT id, data
# FROM briefs JOIN frameworks ON briefs.framework_id = frameworks.id
# WHERE frameworks.slug = 'digital-outcomes-and-specialists-2' AND briefs.published_at IS null;
query = briefs_table.join(
frameworks_table,
briefs_table.c.framework_id == frameworks_table.c.id
).select(
sa.and_(
frameworks_table.c.slug == 'digital-outcomes-and-specialists-2',
briefs_table.c.published_at == sa.null()
)
).with_only_columns(
(
briefs_table.c.id,
briefs_table.c.data
)
)
results = conn.execute(query).fetchall()
for brief_id, brief_data in results:
if brief_data.pop('startDate', None) is not None:
# UPDATE briefs SET data = _brief_data WHERE id = _brief_id;
query = briefs_table.update().where(briefs_table.c.id==brief_id).values(data=brief_data)
conn.execute(query)
开发者ID:alphagov,项目名称:digitalmarketplace-api,代码行数:28,代码来源:870_remove_invalid_draft_dos2_brief_dates.py
示例12: get_scheduled_jobs_to_start
def get_scheduled_jobs_to_start(time, batch_size=None, session=None):
query = b.model_query(models.ScheduledJob)
execute_at_col = models.ScheduledJob.execute_at
captured_at_col = models.ScheduledJob.captured_at
# Filter by execution time accounting for a configured job pickup interval.
query = query.filter(
execute_at_col <
time - datetime.timedelta(seconds=CONF.scheduler.pickup_job_after)
)
# Filter by captured time accounting for a configured captured job timeout.
min_captured_at = (
utils.utc_now_sec() -
datetime.timedelta(seconds=CONF.scheduler.captured_job_timeout)
)
query = query.filter(
sa.or_(
captured_at_col == sa.null(),
captured_at_col <= min_captured_at
)
)
query = query.order_by(execute_at_col)
query = query.limit(batch_size)
return query.all()
开发者ID:openstack,项目名称:mistral,代码行数:29,代码来源:api.py
示例13: upgrade
def upgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
trait = sa.Table('trait', meta, autoload=True)
event = sa.Table('event', meta, autoload=True)
trait_type = sa.Table('trait_type', meta, autoload=True)
for t_name, t_type, t_nullable, col_name, __ in tables:
t_table = sa.Table(
t_name, meta,
sa.Column('event_id', sa.Integer,
sa.ForeignKey(event.c.id), primary_key=True),
sa.Column('key', sa.String(255), primary_key=True),
sa.Column('value', t_type, nullable=t_nullable),
sa.Index('ix_%s_event_id_key' % t_name,
'event_id', 'key'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
t_table.create()
query = sa.select(
[trait.c.event_id,
trait_type.c.desc,
trait.c[col_name]]).select_from(
trait.join(trait_type,
trait.c.trait_type_id == trait_type.c.id)).where(
trait.c[col_name] != sa.null())
if query.alias().select().scalar() is not None:
t_table.insert().from_select(
['event_id', 'key', 'value'], query).execute()
trait.drop()
trait_type.drop()
开发者ID:AVerma87,项目名称:ceilometer,代码行数:30,代码来源:041_expand_event_traits.py
示例14: has_property
def has_property(self, prop):
property_granted_select = select(
[null()],
from_obj=[
Property.__table__,
PropertyGroup.__table__,
Membership.__table__
]
).where(
and_(
Property.name == prop,
Property.property_group_id == PropertyGroup.id,
PropertyGroup.id == Membership.group_id,
Membership.user_id == self.id,
Membership.active
)
)
#.cte("property_granted_select")
return and_(
not_(exists(
property_granted_select.where(
Property.granted == false())
)),
exists(
property_granted_select.where(
Property.granted == true()
)
)
)
开发者ID:lukasjuhrich,项目名称:pycroft,代码行数:30,代码来源:user.py
示例15: where
def where(self,clauses):
'''
sql條件語句
每一個語句包括字段名稱、操作符、值。
clause:一個sql條件表達語句,格式為:["and"/"or",[(字段名称,操作符,值),...]]
'''
if not clauses:
return None
if clauses[0] in _sql_rl:
rl=_sql_rl[clauses[0]]
else:
raise Exception("sql where clause list must start with 'and' or 'or'")
wl=[]
for c in clauses[1]:
c=list(c)
if c[0] in _sql_rl:
wl.append(self.where(c))
elif isinstance(c,_BinaryExpression):
wl.append(c)
elif len(c)==3 and c[1] in _exp_where_op:
if c[2]=='null':
c[2]=null()
wl.append(self.clause(c))
else:
raise Exception("sql clause error,%s" % (clauses))
return rl(*wl)
开发者ID:mu2019,项目名称:heysqlware,代码行数:26,代码来源:heysqlware.py
示例16: has_property
def has_property(cls, prop, when=None):
# TODO Use joins
property_granted_select = select(
[null()],
from_obj=[
Property.__table__,
PropertyGroup.__table__,
Membership.__table__
]
).where(
and_(
Property.name == prop,
Property.property_group_id == PropertyGroup.id,
PropertyGroup.id == Membership.group_id,
Membership.user_id == cls.id,
Membership.active(when)
)
)
#.cte("property_granted_select")
return and_(
not_(exists(
property_granted_select.where(
Property.granted == false())
)),
exists(
property_granted_select.where(
Property.granted == true()
)
)
).self_group().label("has_property_" + prop)
开发者ID:agdsn,项目名称:pycroft,代码行数:31,代码来源:user.py
示例17: _send_password_resets
def _send_password_resets(self):
db = request.environ['sqlalchemy.session']
model = request.environ['sqlalchemy.model']
resets = db.query(model.PasswordResetRequest).filter_by(sent_on=null()).all()
from email.MIMEText import MIMEText
from fivecents.lib.mail import EmailSender
for reset in resets:
try:
c.password_reset_link = h.site_url() + h.url_for(controller='password', action='reset', token=reset.token)
c.password_reset_code = reset.token
body = render_jinja('messages/password_reset.jinja')
message = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
message['Subject'] = _("Password reset instruction from %s") % h.site_name()
message['To'] = reset.user.email
message['From'] = config['from_address']
log.info(_("Sending password reset e-mail to %s") % message['To'])
ms = EmailSender(to_addresses = message['To'])
ms.send_mime(message)
reset.sent_on = datetime.utcnow()
db.commit()
except:
log.error("Can't send password reset e-mail", exc_info=1)
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:26,代码来源:cron.py
示例18: _send_password_resets
def _send_password_resets(self):
db = request.environ['sqlalchemy.session']
model = request.environ['sqlalchemy.model']
output = StringIO()
resets = db.query(model.PasswordResetRequest).filter_by(sent_on=null()).all()
for reset in resets:
try:
c.password_reset_link = h.site_url() + h.url_for(controller='password', action='reset', token=reset.token)
c.password_reset_code = reset.token
body = render_jinja('messages/password_reset.jinja')
message = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
message['Subject'] = _("Password reset instruction from %s") % h.site_name()
message['To'] = reset.user.email
message['From'] = config['from_address']
ms = EmailSender(to_addresses = message['To'])
ms.send_mime(message)
reset.sent_on = datetime.utcnow()
db.commit()
output.write(_("Sending password reset e-mail to %s\n") % message['To'])
except:
output.write(_("Can't send password reset e-mail: %s\n") % format_exc())
return output.getvalue()
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:27,代码来源:cron.py
示例19: migrate_claims
def migrate_claims(migrate_engine, metadata, buildrequests, objects,
buildrequest_claims):
# First, ensure there is an object row for each master
null_id = sa.null().label('id')
if migrate_engine.dialect.name == 'postgresql':
# postgres needs NULL cast to an integer:
null_id = sa.cast(null_id, sa.INTEGER)
new_objects = sa.select([
null_id,
buildrequests.c.claimed_by_name.label("name"),
sa.literal_column("'BuildMaster'").label("class_name"),
],
whereclause=buildrequests.c.claimed_by_name != NULL,
distinct=True)
# this doesn't seem to work without str() -- verified in sqla 0.6.0 - 0.7.1
migrate_engine.execute(
str(sautils.InsertFromSelect(objects, new_objects)))
# now make a buildrequest_claims row for each claimed build request
join = buildrequests.join(objects,
(buildrequests.c.claimed_by_name == objects.c.name)
# (have to use sa.text because str, below, doesn't work
# with placeholders)
& (objects.c.class_name == sa.text("'BuildMaster'")))
claims = sa.select([
buildrequests.c.id.label('brid'),
objects.c.id.label('objectid'),
buildrequests.c.claimed_at,
], from_obj=[join],
whereclause=buildrequests.c.claimed_by_name != NULL)
migrate_engine.execute(
str(sautils.InsertFromSelect(buildrequest_claims, claims)))
开发者ID:ABI-Software,项目名称:buildbot,代码行数:34,代码来源:011_add_buildrequest_claims.py
示例20: migrate_claims
def migrate_claims(migrate_engine, metadata, buildrequests, objects, buildrequest_claims):
# First, ensure there is an object row for each master
new_objects = sa.select(
[
sa.null().label("id"),
buildrequests.c.claimed_by_name.label("name"),
sa.literal_column("'BuildMaster'").label("class_name"),
],
whereclause=buildrequests.c.claimed_by_name != None,
distinct=True,
)
# this doesn't seem to work without str() -- verified in sqla 0.6.0 - 0.7.1
migrate_engine.execute(str(sautils.InsertFromSelect(objects, new_objects)))
# now make a buildrequest_claims row for each claimed build request
join = buildrequests.join(
objects,
(buildrequests.c.claimed_by_name == objects.c.name)
# (have to use sa.text because str, below, doesn't work
# with placeholders)
& (objects.c.class_name == sa.text("'BuildMaster'")),
)
claims = sa.select(
[buildrequests.c.id.label("brid"), objects.c.id.label("objectid"), buildrequests.c.claimed_at],
from_obj=[join],
whereclause=buildrequests.c.claimed_by_name != None,
)
migrate_engine.execute(str(sautils.InsertFromSelect(buildrequest_claims, claims)))
开发者ID:nlmills,项目名称:buildbot,代码行数:30,代码来源:011_add_buildrequest_claims.py
注:本文中的sqlalchemy.null函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论