本文整理汇总了Python中sqlalchemy.sql.and_函数的典型用法代码示例。如果您正苦于以下问题:Python and_函数的具体用法?Python and_怎么用?Python and_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了and_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_pixbuf
def get_pixbuf (self, attr,val):
if attr=='category':
tbl = self.rd.recipe_table.join(self.rd.categories_table)
col = self.rd.categories_table.c.category
if hasattr(self,'category_images'):
stment = and_(col==val,self.rd.recipe_table.c.image!=None,
self.rd.recipe_table.c.image!='',
not_(self.rd.recipe_table.c.title.in_(self.category_images))
)
else:
stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
result = tbl.select(stment,limit=1).execute().fetchone()
if not hasattr(self,'category_images'): self.category_images = []
if result: self.category_images.append(result.title)
elif attr=='rating':
return star_generator.get_pixbuf(val)
elif attr in ['preptime','cooktime']:
return get_time_slice(val)
else:
tbl = self.rd.recipe_table
col = getattr(self.rd.recipe_table.c,attr)
stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
result = tbl.select(stment,limit=1).execute().fetchone()
if result and result.thumb:
return scale_pb(get_pixbuf_from_jpg(result.image))
else:
return self.get_base_icon(attr) or self.get_base_icon('category')
开发者ID:Bercio,项目名称:gourmet,代码行数:27,代码来源:browser.py
示例2: getRunResultCount
def getRunResultCount(self, run_id, report_filters):
filter_expression = construct_report_filter(report_filters)
session = self.__session
try:
reportCount = session.query(Report) \
.filter(Report.run_id == run_id) \
.outerjoin(File,
and_(Report.file_id == File.id,
File.run_id == run_id)) \
.outerjoin(BugPathEvent,
Report.end_bugevent == BugPathEvent.id) \
.outerjoin(SuppressBug,
and_(SuppressBug.hash == Report.bug_id,
SuppressBug.run_id == run_id)) \
.filter(filter_expression)\
.count()
if reportCount is None:
reportCount = 0
return reportCount
except sqlalchemy.exc.SQLAlchemyError as alchemy_ex:
msg = str(alchemy_ex)
LOG.error(msg)
raise shared.ttypes.RequestFailed(shared.ttypes.ErrorCode.DATABASE, msg)
开发者ID:zdzhjx,项目名称:codechecker,代码行数:28,代码来源:client_db_access_handler.py
示例3: get_relationships
def get_relationships(self, with_package=None, type=None, active=True,
direction='both'):
'''Returns relationships this package has.
Keeps stored type/ordering (not from pov of self).'''
assert direction in ('both', 'forward', 'reverse')
if with_package:
assert isinstance(with_package, Package)
from package_relationship import PackageRelationship
forward_filters = [PackageRelationship.subject==self]
reverse_filters = [PackageRelationship.object==self]
if with_package:
forward_filters.append(PackageRelationship.object==with_package)
reverse_filters.append(PackageRelationship.subject==with_package)
if active:
forward_filters.append(PackageRelationship.state==core.State.ACTIVE)
reverse_filters.append(PackageRelationship.state==core.State.ACTIVE)
if type:
forward_filters.append(PackageRelationship.type==type)
reverse_type = PackageRelationship.reverse_type(type)
reverse_filters.append(PackageRelationship.type==reverse_type)
q = meta.Session.query(PackageRelationship)
if direction == 'both':
q = q.filter(or_(
and_(*forward_filters),
and_(*reverse_filters),
))
elif direction == 'forward':
q = q.filter(and_(*forward_filters))
elif direction == 'reverse':
q = q.filter(and_(*reverse_filters))
return q.all()
开发者ID:HatemAlSum,项目名称:ckan,代码行数:31,代码来源:package.py
示例4: can_merge_tracks
def can_merge_tracks(conn, track_ids):
fp1 = schema.fingerprint.alias('fp1')
fp2 = schema.fingerprint.alias('fp2')
join_cond = sql.and_(fp1.c.id < fp2.c.id, fp1.c.track_id < fp2.c.track_id)
src = fp1.join(fp2, join_cond)
cond = sql.and_(fp1.c.track_id.in_(track_ids), fp2.c.track_id.in_(track_ids))
query = sql.select([
fp1.c.track_id, fp2.c.track_id,
sql.func.max(sql.func.abs(fp1.c.length - fp2.c.length)),
sql.func.min(sql.func.acoustid_compare2(fp1.c.fingerprint, fp2.c.fingerprint, const.TRACK_MAX_OFFSET)),
], cond, from_obj=src).group_by(fp1.c.track_id, fp2.c.track_id).order_by(fp1.c.track_id, fp2.c.track_id)
rows = conn.execute(query)
merges = {}
for fp1_id, fp2_id, length_diff, score in rows:
if score < const.TRACK_GROUP_MERGE_THRESHOLD:
continue
if length_diff > const.FINGERPRINT_MAX_LENGTH_DIFF:
continue
group = fp1_id
if group in merges:
group = merges[group]
merges[fp2_id] = group
result = []
for group in set(merges.values()):
result.append(set([group] + [i for i in merges if merges[i] == group]))
return result
开发者ID:Softmotions,项目名称:acoustid-server,代码行数:26,代码来源:track.py
示例5: rulesCallback
def rulesCallback(ip,port,rules):
serverid = getServerId(ip,port)
for rule in rules:
if rule[0] == 'tickrate':
db.execute(
update(tbl_server_history)
.where(
and_(
tbl_server_history.c.id==serverid,
tbl_server_history.c.date==tm,
)
)
.values(
tickrate=rule[1]
)
)
elif rule[0] == 'ent_count':
db.execute(
update(tbl_server_history)
.where(
and_(
tbl_server_history.c.id==serverid,
tbl_server_history.c.date==tm,
)
)
.values(
ent_count=rule[1].replace(',','')
)
)
开发者ID:devicenull,项目名称:ns2servers,代码行数:30,代码来源:serverlist.py
示例6: get_scan_information
def get_scan_information():
if not ValidateClass.check_login():
return redirect(ADMIN_URL + '/index')
if request.method == "POST":
start_time_stamp = request.form.get("start_time_stamp")[0:10]
end_time_stamp = request.form.get("end_time_stamp")[0:10]
start_time_array = datetime.datetime.fromtimestamp(int(start_time_stamp))
end_time_array = datetime.datetime.fromtimestamp(int(end_time_stamp))
if start_time_stamp >= end_time_stamp:
return jsonify(tag="danger", msg="wrong date select.", code=1002)
task_count = CobraTaskInfo.query.filter(
and_(CobraTaskInfo.time_start >= start_time_stamp, CobraTaskInfo.time_start <= end_time_stamp)
).count()
vulns_count = CobraResults.query.filter(
and_(CobraResults.created_at >= start_time_array, CobraResults.created_at <= end_time_array)
).count()
projects_count = CobraProjects.query.filter(
and_(CobraProjects.last_scan >= start_time_array, CobraProjects.last_scan <= end_time_array)
).count()
files_count = db.session.query(func.sum(CobraTaskInfo.file_count).label('files')).filter(
and_(CobraTaskInfo.time_start >= start_time_stamp, CobraTaskInfo.time_start <= end_time_stamp)
).first()[0]
code_number = db.session.query(func.sum(CobraTaskInfo.code_number).label('codes')).filter(
and_(CobraTaskInfo.time_start >= start_time_stamp, CobraTaskInfo.time_start <= end_time_stamp)
).first()[0]
return jsonify(code=1001, task_count=task_count, vulns_count=vulns_count, projects_count=projects_count,
files_count=int(files_count), code_number=int(code_number))
开发者ID:0x24bin,项目名称:cobra,代码行数:31,代码来源:DashboardController.py
示例7: all_appointments
def all_appointments():
"""Returns a json object which contains all appointments in a specific
date.
"""
if not request.args.get('date'):
date_obj = date.today()
else:
date_obj = datetime.strptime(request.args.get('date'),
"%Y-%m-%d").date()
timezone = float(str(request.args.get('timezone', 0.00)))
start_time = get_utc_seconds(date_obj, 0, timezone)
end_time = get_utc_seconds(date_obj, 1440, timezone)
conn = db.engine.connect()
query = select([Appointment],
or_(and_(Appointment.start_time >= start_time,
Appointment.start_time <= end_time),
and_(Appointment.end_time >= start_time,
Appointment.end_time <= end_time))).\
order_by(Appointment.start_time)
result = conn.execute(query).fetchall()
apt_time_utc_seconds = [[a.start_time, a.end_time] for a in result]
apt_time_slider_minutes = [[get_local_minutes(a[0], date_obj, timezone),
get_local_minutes(a[1], date_obj, timezone)]
for a in apt_time_utc_seconds]
return jsonify(apt_time_utc_seconds=apt_time_utc_seconds,
apt_time_slider_minutes=apt_time_slider_minutes,
date=str(date_obj),
timezone=timezone)
开发者ID:SunPowered,项目名称:flask-appointment-calendar,代码行数:31,代码来源:views.py
示例8: generate_query_from_keywords
def generate_query_from_keywords(self, model, fulltextsearch=None,
**kwargs):
clauses = [_entity_descriptor(model, key) == value
for key, value in kwargs.items()
if (key != 'info' and key != 'fav_user_ids'
and key != 'created' and key != 'project_id')]
queries = []
headlines = []
order_by_ranks = []
or_clauses = []
if 'info' in kwargs.keys():
queries, headlines, order_by_ranks = self.handle_info_json(model, kwargs['info'],
fulltextsearch)
clauses = clauses + queries
if 'created' in kwargs.keys():
like_query = kwargs['created'] + '%'
clauses.append(_entity_descriptor(model,'created').like(like_query))
if 'project_id' in kwargs.keys():
tmp = "%s" % kwargs['project_id']
project_ids = re.findall(r'\d+', tmp)
for project_id in project_ids:
or_clauses.append((_entity_descriptor(model, 'project_id') ==
project_id))
all_clauses = and_(and_(*clauses), or_(*or_clauses))
return (all_clauses,), queries, headlines, order_by_ranks
开发者ID:influencerplus123,项目名称:tinybee.ai,代码行数:28,代码来源:__init__.py
示例9: __init__
def __init__(self, dbsession, user):
self.dbsession = dbsession
self.user = user
self.query = self.dbsession.query(
func.count(Message.id).label('total'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.nameinfected == 0, Message.otherinfected == 0,
Message.spam == 0, Message.highspam == 0), 1)],
else_=0)).label('clean'),
func.sum(case([(Message.virusinfected > 0, 1)],
else_=0)).label('virii'),
func.sum(case([(and_(Message.highspam == 0,
Message.spam == 0, Message.virusinfected == 0,
or_(Message.nameinfected > 0, Message.otherinfected > 0)), 1)],
else_=0)).label('infected'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
or_(Message.spam > 0, Message.highspam > 0)), 1)],
else_=0)).label('spam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.spam > 0, Message.highspam == 0), 1)],
else_=0)).label('lowspam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.highspam > 0), 1)],
else_=0)).label('highspam'))\
.filter(Message.date == now().date())
开发者ID:TetraAsh,项目名称:baruwa2,代码行数:28,代码来源:query.py
示例10: export_modified_cells
def export_modified_cells(self, hourly=True, bucket=None):
if bucket is None: # pragma: no cover
bucket = self.app.s3_settings['assets_bucket']
now = util.utcnow()
if hourly:
end_time = now.replace(minute=0, second=0)
file_time = end_time
file_type = 'diff'
start_time = end_time - timedelta(hours=1)
cond = and_(cell_table.c.modified >= start_time,
cell_table.c.modified < end_time,
cell_table.c.cid != CELLID_LAC,
cell_table.c.lat.isnot(None))
else:
file_time = now.replace(hour=0, minute=0, second=0)
file_type = 'full'
cond = and_(cell_table.c.cid != CELLID_LAC,
cell_table.c.lat.isnot(None))
filename = 'MLS-%s-cell-export-' % file_type
filename = filename + file_time.strftime('%Y-%m-%dT%H0000.csv.gz')
try:
with selfdestruct_tempdir() as d:
path = os.path.join(d, filename)
with self.db_session() as sess:
write_stations_to_csv(sess, cell_table, CELL_COLUMNS, cond,
path, make_cell_export_dict, CELL_FIELDS)
write_stations_to_s3(path, bucket)
except Exception as exc: # pragma: no cover
self.heka_client.raven('error')
raise self.retry(exc=exc)
开发者ID:SmartDelivery,项目名称:ichnaea,代码行数:32,代码来源:tasks.py
示例11: __eq__
def __eq__(self, other):
if other is None:
return sql.and_(*[a==None for a in self.prop.columns])
else:
return sql.and_(*[a==b for a, b in
zip(self.prop.columns,
other.__composite_values__())])
开发者ID:Eubolist,项目名称:ankimini,代码行数:7,代码来源:properties.py
示例12: __eq__
def __eq__(self, other):
if other is None:
if self.prop.direction in [ONETOMANY, MANYTOMANY]:
return ~sql.exists([1], self.prop.primaryjoin)
else:
return self.prop._optimized_compare(None)
elif self.prop.uselist:
if not hasattr(other, "__iter__"):
raise exceptions.InvalidRequestError(
"Can only compare a collection to an iterable object. Use contains()."
)
else:
j = self.prop.primaryjoin
if self.prop.secondaryjoin:
j = j & self.prop.secondaryjoin
clauses = []
for o in other:
clauses.append(
sql.exists(
[1],
j
& sql.and_(
*[
x == y
for (x, y) in zip(
self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(o)
)
]
),
)
)
return sql.and_(*clauses)
else:
return self.prop._optimized_compare(other)
开发者ID:serah,项目名称:HR,代码行数:34,代码来源:properties.py
示例13: statistics_update
def statistics_update(self, node, population, size, mtime, cluster=0):
"""Update the statistics of the given node.
Statistics keep track the population, total
size of objects and mtime in the node's namespace.
May be zero or positive or negative numbers.
"""
s = select([self.statistics.c.population, self.statistics.c.size],
and_(self.statistics.c.node == node,
self.statistics.c.cluster == cluster))
rp = self.conn.execute(s)
r = rp.fetchone()
rp.close()
if not r:
prepopulation, presize = (0, 0)
else:
prepopulation, presize = r
population += prepopulation
population = max(population, 0)
size += presize
#insert or replace
#TODO better upsert
u = self.statistics.update().where(and_(
self.statistics.c.node == node,
self.statistics.c.cluster == cluster))
u = u.values(population=population, size=size, mtime=mtime)
rp = self.conn.execute(u)
rp.close()
if rp.rowcount == 0:
ins = self.statistics.insert()
ins = ins.values(node=node, population=population, size=size,
mtime=mtime, cluster=cluster)
self.conn.execute(ins).close()
开发者ID:antonis-m,项目名称:synnefo,代码行数:33,代码来源:node.py
示例14: start
def start(conf):
# connect to db
db.engine = engine = engine_from_config(dict(conf.items('sqlalchemy')), prefix='')
db.metadata.bind = engine
conn = engine.connect()
Session = sessionmaker(bind=engine)
session = Session()
profiles = []
topics = []
for user in session.query(User):
for profile in user.profiles:
if profile.origin == 5:
profiles.append(profile.profile_id)
for topic in user.topics:
if topic.profile_id in profiles:
topics.append(topic.topic_id)
for topic_id in topics:
print "checking", topic_id
s = select([func.count(db.t_message.c.message_id)], and_(db.t_message.c.origin == 5, db.t_message.c.topic_id == topic_id))
(count,) = conn.execute(s).fetchone()
if count > 1000:
(m_id,) = conn.execute(select([db.t_message.c.message_id],
db.t_message.c.topic_id == topic_id).order_by(
db.t_message.c.message_id.desc()).offset(1000).limit(1)).fetchone()
print "purging", topic_id, count, m_id
conn.execute(db.t_message.delete().where(and_(db.t_message.c.message_id < m_id, db.t_message.c.topic_id == topic_id)))
开发者ID:ak1394,项目名称:pavome-server,代码行数:29,代码来源:purge.py
示例15: properties
def properties(self, name):
connection = self._client.connect()
rval = {}
for interval,config in self._intervals.items():
rval.setdefault(interval, {})
stmt = select([self._table.c.i_time]).where(
and_(
self._table.c.name==name,
self._table.c.interval==interval
)
).order_by( asc(self._table.c.i_time) ).limit(1)
rval[interval]['first'] = config['i_calc'].from_bucket(
connection.execute(stmt).first()['i_time'] )
stmt = select([self._table.c.i_time]).where(
and_(
self._table.c.name==name,
self._table.c.interval==interval
)
).order_by( desc(self._table.c.i_time) ).limit(1)
rval[interval]['last'] = config['i_calc'].from_bucket(
connection.execute(stmt).first()['i_time'] )
return rval
开发者ID:malletjo,项目名称:kairos,代码行数:26,代码来源:sql_backend.py
示例16: claim_code
def claim_code(cls, user, deal):
# check if they already have a code for this deal and return it
try:
result = (Session.query(cls)
.filter(and_(cls.user == user._id,
cls.deal == deal))
.one())
return result.code
except NoResultFound:
pass
# select an unclaimed code, assign it to the user, and return it
try:
claiming = (Session.query(cls)
.filter(and_(cls.deal == deal,
cls.user == None,
func.pg_try_advisory_lock(cls.id)))
.limit(1)
.one())
except NoResultFound:
raise GoldPartnerCodesExhaustedError
claiming.user = user._id
claiming.date = datetime.now(g.tz)
Session.add(claiming)
Session.commit()
# release the lock
Session.query(func.pg_advisory_unlock_all()).all()
return claiming.code
开发者ID:macanhhuy,项目名称:reddit,代码行数:31,代码来源:gold.py
示例17: _type_get
def _type_get(self, name, interval, i_bucket, i_end=None):
connection = self._client.connect()
rval = OrderedDict()
stmt = self._table.select()
if i_end:
stmt = stmt.where(
and_(
self._table.c.name==name,
self._table.c.interval==interval,
self._table.c.i_time>=i_bucket,
self._table.c.i_time<=i_end,
)
)
else:
stmt = stmt.where(
and_(
self._table.c.name==name,
self._table.c.interval==interval,
self._table.c.i_time==i_bucket,
)
)
stmt = stmt.order_by( self._table.c.r_time )
for row in connection.execute(stmt):
rval.setdefault(row['i_time'],OrderedDict())[row['r_time']] = row['value']
return rval
开发者ID:malletjo,项目名称:kairos,代码行数:27,代码来源:sql_backend.py
示例18: get_data_point_at_datetime
def get_data_point_at_datetime(self, measurement_id, point_name, date_time, interpolation_type="step"):
if interpolation_type == "step":
sel = (
select([self.DataPoints])
.where(
and_(
and_(
self.DataPoints.c.measurement_id == measurement_id,
or_(
self.DataPoints.c.point_name_long.like(point_name),
self.DataPoints.c.point_name_short.like(point_name),
),
),
self.DataPoints.c.point_measured <= date_time,
)
)
.order_by(desc(self.DataPoints.c.point_measured))
)
result = self.connection.execute(sel)
row = result.fetchone()
sign, mantissa, exponent, bytecount = (
row["point_sign"],
row["point_mantissa"],
row["point_exponent"],
row["point_bytecount"],
)
data_point = mpmath.mpf((sign, mantissa, exponent, bytecount))
else:
data_points, data_points_measured = self.get_data_points(measurement_id, point_name)
data_points_measured = data_points_measured.astype("datetime64[us]").astype("d")
Interp = interp1d(data_points_measured, data_points)
data_point = Interp(np.datetime64(date_time, "us").astype("d"))
return data_point, date_time
开发者ID:bond-anton,项目名称:Schottky,代码行数:33,代码来源:ProjectManager.py
示例19: validate_edware_prod
def validate_edware_prod(self):
with get_prod_connection(self.tenant) as connection:
fact_table = connection.get_table('fact_asmt_outcome_vw')
dim_student = connection.get_table('dim_student')
update_output_data = select([fact_table.c.rec_status], and_(fact_table.c.student_id == 'f7251065-ca82-4248-9397-cc722e97bbdc', fact_table.c.asmt_guid == 'a685f0ec-a0a6-4b1e-93b8-0c4298ff6374'))
update_output_table = connection.execute(update_output_data).fetchall()
self.assertIn(('D',), update_output_table, "Delete status D is not found in the Update record")
self.assertIn(('C',), update_output_table, "Insert status C is not found in the Update record")
# verify update asmt_score in fact_table
update_asmt_score = select([fact_table.c.asmt_score], and_(fact_table.c.student_id == 'f7251065-ca82-4248-9397-cc722e97bbdc', fact_table.c.rec_status == 'C', fact_table.c.asmt_guid == 'a685f0ec-a0a6-4b1e-93b8-0c4298ff6374'))
new_asmt_score = connection.execute(update_asmt_score).fetchall()
expected_asmt_score = [(1900,)]
# verify that score is updated in fact_Asmt
self.assertEquals(new_asmt_score, expected_asmt_score)
# verify that there is only one record with status C
self.assertEquals(len(new_asmt_score), 1)
# verification for dim_student update : last name change to Bush
update_last_name = select([dim_student.c.last_name], and_(dim_student.c.student_id == 'f7251065-ca82-4248-9397-cc722e97bbdc', dim_student.c.batch_guid == self.guid_batch_id, dim_student.c.rec_status == "C"))
result_dim_student = connection.execute(update_last_name).fetchall()
expected_last_name = [('Bush',)]
self.assertEquals(result_dim_student, expected_last_name)
# verify that old recod is deactive
inactive_rec = select([dim_student], and_(dim_student.c.student_id == 'f7251065-ca82-4248-9397-cc722e97bbdc', dim_student.c.rec_status == "I"))
inactive_result = connection.execute(inactive_rec).fetchall()
print(len(inactive_result))
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:27,代码来源:test_update_rec.py
示例20: get_measures
def get_measures(self):
"""Find all data that should be included in the report.
The data is returned as a list of tuples containing a
:py:class:`Module <euphorie.client.model.Module>`,
:py:class:`Risk <euphorie.client.model.Risk>` and
:py:class:`ActionPlan <euphorie.client.model.ActionPlan>`. Each
entry in the list will correspond to a row in the generated Excel
file.
This implementation differs from Euphorie in its ordering:
it sorts on risk priority instead of start date.
"""
query = (
Session.query(model.Module, model.Risk, model.ActionPlan)
.filter(sql.and_(model.Module.session == self.session, model.Module.profile_index > -1))
.filter(sql.not_(model.SKIPPED_PARENTS))
.filter(sql.or_(model.MODULE_WITH_RISK_OR_TOP5_FILTER, model.RISK_PRESENT_OR_TOP5_FILTER))
.join(
(
model.Risk,
sql.and_(
model.Risk.path.startswith(model.Module.path),
model.Risk.depth == model.Module.depth + 1,
model.Risk.session == self.session,
),
)
)
.join((model.ActionPlan, model.ActionPlan.risk_id == model.Risk.id))
.order_by(sql.case(value=model.Risk.priority, whens={"high": 0, "medium": 1}, else_=2), model.Risk.path)
)
return query.all()
开发者ID:pombredanne,项目名称:osha.oira,代码行数:32,代码来源:report.py
注:本文中的sqlalchemy.sql.and_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论