本文整理汇总了Python中sqlalchemy.sql.expression.true函数的典型用法代码示例。如果您正苦于以下问题:Python true函数的具体用法?Python true怎么用?Python true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_all_users
def get_all_users(when):
hour = when.hour
return Session.query(User).filter(
User.send_passwords_periodically == true(),
User.email_verified == true(),
extract('hour', User.creation) == hour,
).order_by(User.creation)
开发者ID:ablanco,项目名称:yith-library-server,代码行数:7,代码来源:backups.py
示例2: filter_private_stories
def filter_private_stories(query, current_user, story_model=models.Story):
"""Takes a query and filters out stories the user shouldn't see.
:param query: The query to be filtered.
:param current_user: The ID of the user requesting the result.
:param story_model: The database model used for stories in the query.
"""
# First filter based on users with permissions set directly
query = query.outerjoin(models.story_permissions,
models.Permission,
models.user_permissions,
models.User)
if current_user:
visible_to_users = query.filter(
or_(
and_(
models.User.id == current_user,
story_model.private == true()
),
story_model.private == false(),
story_model.id.is_(None)
)
)
else:
visible_to_users = query.filter(
or_(
story_model.private == false(),
story_model.id.is_(None)
)
)
# Now filter based on membership of teams with permissions
users = aliased(models.User, name="story_users")
query = query.outerjoin(models.team_permissions,
models.Team,
models.team_membership,
(users,
users.id == models.team_membership.c.user_id))
if current_user:
visible_to_teams = query.filter(
or_(
and_(
users.id == current_user,
story_model.private == true()
),
story_model.private == false(),
story_model.id.is_(None)
)
)
else:
visible_to_teams = query.filter(
or_(
story_model.private == false(),
story_model.id.is_(None)
)
)
return visible_to_users.union(visible_to_teams)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:59,代码来源:base.py
示例3: get_all_parent_categories
def get_all_parent_categories(self):
"""
---- Returns a list of all the sub Categories from the DB.
"""
result = self.dsession.query(Categories).filter(
Categories.hasChildren == true(),
Categories.isActive == true()).order_by(Categories.name).all()
return result
开发者ID:v2saumb,项目名称:catalog,代码行数:8,代码来源:db_interface.py
示例4: _get_default_service_type
def _get_default_service_type(self, context):
try:
query = context.session.query(ServiceType)
return query.filter(ServiceType.default == expr.true()).one()
except orm_exc.NoResultFound:
return
except orm_exc.MultipleResultsFound:
# This should never happen. If it does, take the first instance
query2 = context.session.query(ServiceType)
results = query2.filter(ServiceType.default == expr.true()).all()
LOG.warning(_("Multiple default service type instances found." "Will use instance '%s'"), results[0]["id"])
return results[0]
开发者ID:ykaneko,项目名称:quantum,代码行数:12,代码来源:servicetype_db.py
示例5: test_string_range
def test_string_range(self):
clause = lucene_to_sqlalchemy(u'fqdn:[aaa TO zzz]',
{'fqdn': System.fqdn}, [System.fqdn])
self.assert_clause_equals(clause,
and_(System.fqdn >= u'aaa', System.fqdn <= u'zzz'))
clause = lucene_to_sqlalchemy(u'fqdn:[aaa TO *]',
{'fqdn': System.fqdn}, [System.fqdn])
self.assert_clause_equals(clause, and_(System.fqdn >= u'aaa', true()))
clause = lucene_to_sqlalchemy(u'fqdn:[* TO zzz]',
{'fqdn': System.fqdn}, [System.fqdn])
self.assert_clause_equals(clause, and_(true(), System.fqdn <= u'zzz'))
clause = lucene_to_sqlalchemy(u'fqdn:[* TO *]',
{'fqdn': System.fqdn}, [System.fqdn])
self.assert_clause_equals(clause, and_(true(), true()))
开发者ID:beaker-project,项目名称:beaker,代码行数:14,代码来源:test_search_utility.py
示例6: _delete_idle_service_vm_hosting_devices
def _delete_idle_service_vm_hosting_devices(self, context, num, template):
"""Deletes <num> or less unused <template>-based service VM instances.
The number of deleted service vm instances is returned.
"""
# Delete the "youngest" hosting devices since they are more likely
# not to have finished booting
num_deleted = 0
plugging_drv = self.get_hosting_device_plugging_driver(context,
template['id'])
hosting_device_drv = self.get_hosting_device_driver(context,
template['id'])
if plugging_drv is None or hosting_device_drv is None or num <= 0:
return num_deleted
query = context.session.query(hd_models.HostingDevice)
query = query.outerjoin(
hd_models.SlotAllocation,
hd_models.HostingDevice.id ==
hd_models.SlotAllocation.hosting_device_id)
query = query.filter(hd_models.HostingDevice.template_id ==
template['id'],
hd_models.HostingDevice.admin_state_up ==
expr.true(),
hd_models.HostingDevice.tenant_bound ==
expr.null(),
hd_models.HostingDevice.auto_delete ==
expr.true())
query = query.group_by(hd_models.HostingDevice.id).having(
func.count(hd_models.SlotAllocation.logical_resource_id) == 0)
query = query.order_by(
hd_models.HostingDevice.created_at.desc(),
func.count(hd_models.SlotAllocation.logical_resource_id))
hd_candidates = query.all()
num_possible_to_delete = min(len(hd_candidates), num)
for i in range(num_possible_to_delete):
res = plugging_drv.get_hosting_device_resources(
context, hd_candidates[i]['id'],
hd_candidates[i]['complementary_id'], self.l3_tenant_id(),
self.mgmt_nw_id())
if self.svc_vm_mgr.delete_service_vm(context,
hd_candidates[i]['id']):
with context.session.begin(subtransactions=True):
context.session.delete(hd_candidates[i])
plugging_drv.delete_hosting_device_resources(
context, self.l3_tenant_id(), **res)
num_deleted += 1
LOG.info(_LI('Deleted %(num)d hosting devices based on template '
'%(t_id)s'), {'num': num_deleted, 't_id': template['id']})
return num_deleted
开发者ID:JoelCapitao,项目名称:networking-cisco,代码行数:49,代码来源:hosting_device_manager_db.py
示例7: filter
def filter( self, trans, user, query, column_filter ):
""" Modify query to filter histories by sharing status. """
if column_filter == "All":
pass
elif column_filter:
if column_filter == "private":
query = query.filter( self.model_class.users_shared_with == null() )
query = query.filter( self.model_class.importable == false() )
elif column_filter == "shared":
query = query.filter( self.model_class.users_shared_with != null() )
elif column_filter == "accessible":
query = query.filter( self.model_class.importable == true() )
elif column_filter == "published":
query = query.filter( self.model_class.published == true() )
return query
开发者ID:AbhishekKumarSingh,项目名称:galaxy,代码行数:15,代码来源:grids.py
示例8: __init__
def __init__(self, user, reportid, filters=None):
"Init"
self.dbsession = Session
self.user = user
self.reportid = reportid
self.model = None
self.isaggr = False
self.filters = filters
queryfield = getattr(Message, REPORTS[self.reportid]['address'])
orderby = REPORTS[reportid]['sort']
if (self.reportid in ['3', '4', '7', '8']
and self.user.is_superadmin
and not self.filters):
# domains
self.isaggr = True
if self.reportid in ['3', '4']:
# src
self.model = SrcMessageTotals
self.query = self.dbsession\
.query(SrcMessageTotals.id.label('address'),
SrcMessageTotals.total.label('count'),
SrcMessageTotals.volume.label('size'))\
.order_by(desc(orderby))
else:
# dst
self.model = DstMessageTotals
self.query = self.dbsession\
.query(DstMessageTotals.id.label('address'),
DstMessageTotals.total.label('count'),
DstMessageTotals.volume.label('size'))\
.order_by(desc(orderby))
else:
# emails & relays
self.query = self.dbsession.query(queryfield.label('address'),
func.count(queryfield).label('count'),
func.sum(Message.size).label('size'))
if self.reportid != '10':
self.query = self.query.filter(queryfield != u'')\
.group_by(queryfield).order_by(desc(orderby))
else:
self.query = self.query.filter(queryfield != u'127.0.0.1')\
.group_by(queryfield).order_by(desc(orderby))
if self.isaggr:
uquery = AggrFilter(self.query)
else:
uquery = UserFilter(self.dbsession,
self.user,
self.query)
if self.reportid not in ['5', '6', '7', '8']:
self.query = uquery()
if self.reportid in ['5', '6', '7', '8']:
if not self.user.is_superadmin:
uquery.setdirection('in')
self.query = uquery()
else:
flf = self.model.id if self.isaggr else Message.to_domain
self.query = self.query.filter(flf
.in_(self.dbsession.query(Domain.name)
.filter(Domain.status == true())))
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:60,代码来源:query.py
示例9: filter
def filter(self):
"Set filters"
if self.user.is_domain_admin:
dquery = self.dbsession.query(Domain).join(downs,
(oa, downs.c.organization_id == oa.c.organization_id))\
.filter(Domain.status == true())\
.filter(oa.c.user_id == self.user.id).all()
domains = []
for domain in dquery:
domains.append(domain.name)
for domain_alias in domain.aliases:
if domain_alias.status:
domains.append(domain_alias.name)
if not domains:
domains.append('xx')
if self.direction and self.direction == 'in':
self.query = self.query\
.filter(self.model.to_domain.in_(domains))
elif self.direction and self.direction == 'out':
self.query = self.query\
.filter(self.model.from_domain.in_(domains))
else:
self.query = self.query.filter(
func._(or_(self.model.to_domain.in_(domains),
self.model.from_domain.in_(domains))))
if self.user.is_peleb:
self._build_user_filter()
return self.query
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:28,代码来源:query.py
示例10: apply_rules_blueprints
def apply_rules_blueprints(user, args=None):
q = Blueprint.query
if not user.is_admin:
group_user_objs = GroupUserAssociation.query.filter_by(user_id=user.id, manager=False).all()
user_group_ids = [group_user_obj.group.id for group_user_obj in group_user_objs]
banned_group_ids = [banned_group_item.id for banned_group_item in user.banned_groups.all()]
allowed_group_ids = set(user_group_ids) - set(banned_group_ids) # do not allow the banned users
# Start building query expressions based on the condition that :
# a group manager can see all of his blueprints and only enabled ones of other groups
query_exp = Blueprint.is_enabled == true()
allowed_group_ids_exp = None
if allowed_group_ids:
allowed_group_ids_exp = Blueprint.group_id.in_(allowed_group_ids)
query_exp = and_(allowed_group_ids_exp, query_exp)
manager_group_ids = get_manager_group_ids(user)
manager_group_ids_exp = None
if manager_group_ids:
manager_group_ids_exp = Blueprint.group_id.in_(manager_group_ids)
query_exp = or_(query_exp, manager_group_ids_exp)
q = q.filter(query_exp)
if args is not None and 'blueprint_id' in args:
q = q.filter_by(id=args.get('blueprint_id'))
return q
开发者ID:CSC-IT-Center-for-Science,项目名称:pouta-blueprints,代码行数:27,代码来源:rules.py
示例11: modify_cycle_task_overdue_notification
def modify_cycle_task_overdue_notification(task):
"""Add or update the task's overdue notification.
If an overdue notification already exists for the task, its date of sending
is adjusted as needed. If such notification does not exist yet, it gets
created.
Args:
task: The CycleTaskGroupObjectTask instance for which to update the overdue
notifications.
"""
notif_type = get_notification_type(u"cycle_task_overdue")
send_on = datetime.combine(task.end_date, datetime.min.time()) + timedelta(1)
notif = Notification.query.filter(
Notification.object_id == task.id,
Notification.object_type == task.type,
(Notification.sent_at.is_(None) | (Notification.repeating == true())),
Notification.notification_type == notif_type,
).first()
if notif:
if notif.send_on != send_on:
notif.send_on = send_on
db.session.add(notif)
return
# NOTE: The "task.id" check is to assure a notification is created for
# existing task instances only, avoiding DB errors. Overdue notifications
# for new tasks are handled and added elsewhere.
if all([task.id, task.status in task.active_states, task.cycle.is_current]):
add_notif(task, notif_type, send_on, repeating=True)
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:31,代码来源:notification_handler.py
示例12: modify_cycle_task_notification
def modify_cycle_task_notification(obj, notification_name):
notif_type = get_notification_type(notification_name)
notif = Notification.query.filter(
Notification.object_id == obj.id,
Notification.object_type == obj.type,
Notification.notification_type == notif_type,
or_(Notification.sent_at.is_(None), Notification.repeating == true()),
).first()
send_on = datetime.combine(obj.end_date, datetime.min.time()) - timedelta(
notif_type.advance_notice)
today = datetime.combine(date.today(), datetime.min.time())
if send_on < today:
# this should not be allowed, but if a cycle task is changed to a past
# date, we remove the current pending notification if it exists
if notif:
db.session.delete(notif)
elif notif:
# when cycle date is moved in the future and current exists we update
# the current
notif.send_on = send_on
db.session.add(notif)
else:
# when cycle date is moved in the future and no current create new
# notification
add_notif(obj, notif_type, send_on)
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:26,代码来源:notification_handler.py
示例13: get_pending_notifications
def get_pending_notifications():
"""Get notification data for all future notifications.
The data dict that get's returned here contains notification data grouped by
dates on which the notifications should be received.
Returns
list of Notifications, data: a tuple of notifications that were handled
and corresponding data for those notifications.
"""
notifications = db.session.query(Notification).filter(
(Notification.sent_at.is_(None)) | (Notification.repeating == true())
).all()
notif_by_day = defaultdict(list)
for notification in notifications:
notif_by_day[notification.send_on.date()].append(notification)
data = defaultdict(dict)
today = date.today()
for day, notif in notif_by_day.iteritems():
current_day = max(day, today)
data[current_day] = merge_dict(data[current_day],
get_notification_data(notif))
return notifications, data
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:26,代码来源:common.py
示例14: search
def search(what, **conditions):
if what == 'codename':
columns = [db.codesTable.c.name]
elif what == 'point':
columns = [simTable.c.identifier, db.codesTable.c.name, db.decodersTable.c.name,
simTable.c.channel_json, simTable.c.wordSeed, simTable.c.samples,
simTable.c.errors, simTable.c.cputime, simTable.c.date_start,
simTable.c.date_end, simTable.c.machine, simTable.c.program_name,
simTable.c.program_version, simTable.c.stats]
else:
raise ValueError('unknown search: "{}"'.format(what))
condition = expression.true()
for key, val in conditions.items():
if key == 'identifier':
condition &= simTable.c.identifier.in_(val)
elif key == 'code':
condition &= db.codesTable.c.name.in_(val)
else:
raise ValueError()
s = sqla.select(columns, whereclause=condition, from_obj=joinTable, distinct=True,
use_labels=True).order_by(db.codesTable.c.name)
ans = db.engine.execute(s).fetchall()
if what == 'point':
return [dataPointFromRow(row) for row in ans]
return db.engine.execute(s).fetchall()
开发者ID:NinjaComics,项目名称:lpdec,代码行数:25,代码来源:simulation.py
示例15: command
def command(self):
"run command"
self.init()
if self.options.username is None:
print >> sys.stderr, "\nProvide an username\n"
print self.parser.print_help()
sys.exit(126)
try:
user = Session\
.query(User)\
.filter(User.username == self.options.username)\
.filter(User.local == true())\
.one()
if user.validate_password(os.environ['BARUWA_ADMIN_PASSWD']):
print >> sys.stderr, "The account password is valid"
sys.exit(0)
else:
print >> sys.stderr, "The account password is invalid"
sys.exit(2)
except KeyError:
print >> sys.stderr, "BARUWA_ADMIN_PASSWD env variable not set"
sys.exit(126)
except NoResultFound:
print >> sys.stderr, ("No local user found with username %s" %
self.options.username)
sys.exit(126)
finally:
Session.close()
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:29,代码来源:checkpassword.py
示例16: tasks_query
def tasks_query(self, q, marker=None, offset=None, limit=None,
current_user=None, **kwargs):
session = api_base.get_session()
query = api_base.model_query(models.Task, session)
# Filter out tasks or stories that the current user can't see
query = query.outerjoin(models.Story,
models.story_permissions,
models.Permission,
models.user_permissions,
models.User)
if current_user is not None:
query = query.filter(
or_(
and_(
models.User.id == current_user,
models.Story.private == true()
),
models.Story.private == false()
)
)
else:
query = query.filter(models.Story.private == false())
query = self._build_fulltext_search(models.Task, query, q)
query = self._apply_pagination(
models.Task, query, marker, offset, limit)
return query.all()
开发者ID:palvarez89,项目名称:storyboard,代码行数:29,代码来源:sqlalchemy_impl.py
示例17: get_summary
def get_summary(value_class):
q = db_session.query(
func.date(SR_Values.datetime).label("date")
, func.sum(SR_Values.value).label("daily_value")
).filter(SR_Classes.id == SR_Values.value_class_id
).filter(SR_Classes.accum_flag == true()
).filter(SR_Classes.value_class == value_class
).filter(SR_Values.datetime > datetime.datetime(datetime.datetime.now().year, 1, 1)
).group_by(SR_Classes.value_class, func.month(SR_Values.datetime)
).order_by(SR_Classes.value_class, func.date(SR_Values.datetime))
print q
rows = [{ "name": x.date
, "value": x.daily_value
} for x in q.all()]
q = db_session.query(
func.date(SR_Values.datetime).label("date")
, func.avg(SR_Values.value).label("daily_value")
).filter(SR_Classes.id == SR_Values.value_class_id
).filter(SR_Classes.accum_flag == false()
).filter(SR_Classes.value_class == value_class
).filter(SR_Values.datetime > datetime.datetime(datetime.datetime.now().year, 1, 1)
).group_by(SR_Classes.value_class, func.month(SR_Values.datetime)
).order_by(SR_Classes.value_class, func.date(SR_Values.datetime))
rows.extend([{ "name": x.date
, "value": x.daily_value
} for x in q.all()])
print rows
return rows
开发者ID:DrSkippy,项目名称:simple_record,代码行数:28,代码来源:simple_record.py
示例18: get_form_data
def get_form_data(config):
q = db.session.query(Entry)
q = q.filter(Patient.test != true(), Entry.patient_id == Patient.id)
q = q.order_by(Entry.patient_id, Entry.id)
q = _patient_filter(q, Entry.patient_id, config['user'], config['patient_group'])
q = q.filter(Entry.form.has(slug=config['name']))
return q
开发者ID:renalreg,项目名称:radar,代码行数:7,代码来源:queries.py
示例19: read
def read(self):
"""Mark badge notifications from all activities read up to a timestamp.
Takes in an unixtime timestamp.
Fetches the IDs for all all unread badge notifications for the current
user, which come from activities created at up to the passed in
timestamp.
Marks those notifications as read.
"""
timestamp = self.request.get('timestamp')
if not timestamp:
raise Exception('Missing parameter `timestamp`')
timestamp = int(timestamp)
timestamp = datetime.fromtimestamp(int(timestamp), pytz.UTC)
userid = api.user.get_current().getId()
notifications = (
Notification.query
.join(Activity)
.filter(
Notification.userid == userid,
Notification.is_badge == true(),
Notification.is_read == false(),
Activity.created < timestamp,
)
.all()
)
notification_ids = [n.notification_id for n in notifications]
return notification_center().mark_notifications_as_read(
notification_ids)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:31,代码来源:views.py
示例20: create_content_ruleset
def create_content_ruleset():
"""Create content ruleset"""
def set_attrs(obj, dom=None):
"""Set attrs"""
for key in POLICY_SETTINGS_MAP:
attr = POLICY_SETTINGS_MAP[key]
value = getattr(obj, attr)
if value != 0:
policy = Session.query(Policy.name)\
.filter(Policy.id == value)\
.one()
setattr(obj, '%s-name' % attr, "%s.conf" % policy.name)
if dom:
setattr(obj, 'domain_name', dom.name)
setattr(obj, 'domain_aliases', dom.aliases)
return obj
global_policy = Session.query(PolicySettings).get(1)
set_attrs(global_policy)
dpsq = Session.query(DomainPolicy, Domain)\
.filter(DomainPolicy.domain_id == Domain.id)\
.filter(Domain.status == true())
domain_policies = [set_attrs(dps[0], dps[1]) for dps in dpsq]
for policy_type in [1, 2, 3, 4]:
kwargs = dict(gps=global_policy,
dps=domain_policies,
policy_type=POLICY_SETTINGS_MAP[policy_type],
default="%s.conf" % POLICY_FILE_MAP[policy_type])
write_ruleset(POLICY_FILE_MAP[policy_type],
kwargs, 'content.protection.ruleset')
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:29,代码来源:settings.py
注:本文中的sqlalchemy.sql.expression.true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论