本文整理汇总了Python中sqlalchemy.sql.not_函数的典型用法代码示例。如果您正苦于以下问题:Python not_函数的具体用法?Python not_怎么用?Python not_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了not_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_schultyp_all
def get_schultyp_all():
""" liefert alle Schultraeger """
return session.query(KeySchultyp).\
filter(and_(not_(KeySchultyp.Gruppe=='HKM'),
not_(KeySchultyp.Gruppe=='SONS'),
not_(KeySchultyp.Gruppe=='ST'))).\
order_by(KeySchultyp.TextKey)
开发者ID:shagun30,项目名称:djambala-2,代码行数:7,代码来源:queries.py
示例2: get_schema_names
def get_schema_names(self, connection, **kw):
sysschema = self.sys_schemas
query = sql.select([sysschema.c.schemaname],
sql.not_(sysschema.c.schemaname.like('SYS%')),
sql.not_(sysschema.c.schemaname.like('Q%')),
order_by=[sysschema.c.schemaname])
return [self.normalize_name(r[0]) for r in connection.execute(query)]
开发者ID:rredburn,项目名称:python-ibmdbsa,代码行数:7,代码来源:reflection.py
示例3: tasks
def tasks(self):
"""
List of tasks that support this distro
"""
# Delayed import to avoid circular dependency
from . import Task, TaskExcludeArch, TaskExcludeOSMajor
return Task.query\
.filter(not_(Task.excluded_arch.any(
TaskExcludeArch.arch == self.arch)))\
.filter(not_(Task.excluded_osmajor.any(
TaskExcludeOSMajor.osmajor == self.distro.osversion.osmajor)))
开发者ID:ustbgaofan,项目名称:beaker,代码行数:11,代码来源:distrolibrary.py
示例4: myfavorite
def myfavorite(Session, msmgroup):
""" An example """
logger.info("Running myfavorite")
def activation_response(x, k):
""" Curve from [0,1] -> [0,1] """
if k < 0:
k = k / (1 - k)
return x / (1 - k * (x - 1))
# =============#
k_factor = -100
# =============#
prev = Session.query(MSMGroup).get(msmgroup.id - 1)
if prev is None:
return default(Session, msmgroup)
# number of new states discovered
n_new = msmgroup.n_states - prev.n_states
if n_new < 0:
return default(Session, msmgroup)
q = Session.query(Trajectory)
new_trajectories = q.filter(Trajectory.msm_groups.contains(msmgroup)).filter(
not_(Trajectory.msm_groups.contains(prev))
)
# sum of the number of steps in the new trajectories
sum_op = Session.query(func.sum(Trajectory.length))
sum_op = sum_op.filter(Trajectory.msm_groups.contains(msmgroup))
sum_op = sum_op.filter(not_(Trajectory.msm_groups.contains(prev)))
n_steps = float(sum_op.scalar())
p_explore = activation_response(n_new / n_steps, k_factor)
if len(msmgroup.markov_models) != 2:
raise ValueError("Only 2 models")
if not [False, True] == sorted([msm.forcefield.true_kinetics for msm in msmgroup.markov_models]):
raise ValueError("one needs to be true_kinetics, the other not")
for msm in msmgroup.markov_models:
if msm.forcefield.true_kinetics:
msm.model_selection_weight = 1 - p_explore
even_sampling(Session, msm)
else:
msm.model_selection_weight = p_explore
even_sampling(Session, msm)
logger.info("%s selection weight: %f", msm.forcefield.name, msm.model_selection_weight)
开发者ID:pandegroup,项目名称:msmaccelerator,代码行数:49,代码来源:sampling.py
示例5: test_filter_ne_nul_nul
def test_filter_ne_nul_nul(self):
Keyword = self.classes.Keyword
self._equivalent(self.session.query(Keyword).filter(Keyword.user
!= self.u),
self.session.query(Keyword).
filter(not_(Keyword.user_keyword.has(user=self.u))))
开发者ID:afeide,项目名称:LuoYunCloud,代码行数:7,代码来源:test_associationproxy.py
示例6: filterQueryByDateRestrictor
def filterQueryByDateRestrictor(query, dateRestrictor, tableName):
"""Returns the query filtered by the date restrictor, e.g., 'date elicited
is earlier than 2011-11-11'.
"""
location = dateRestrictor['location']
relation = dateRestrictor['relation']
date = dateRestrictor['date']
date = datetime.datetime.combine(date, datetime.time())
tbl = getattr(model, tableName)
col = getattr(tbl, location)
if relation == '' or relation == 'not_':
nextDay = date + datetime.timedelta(1)
previousDay = date - datetime.timedelta(1)
if relation == '':
filterCondition = and_(col > previousDay, col < nextDay)
else:
filterCondition = not_(and_(col > previousDay, col < nextDay))
elif relation == 'earlier_than':
filterCondition = col < date
else:
filterCondition = col > date
return query.filter(filterCondition)
开发者ID:jrwdunham,项目名称:old-webapp,代码行数:32,代码来源:queryBuilder.py
示例7: trash
def trash(self, flag=True):
filter = sql.and_(Comment.reviewed == True,
Comment.publishable == False)
if flag:
return self.filter(filter)
else:
return self.filter(sql.not_(filter))
开发者ID:RadioErewan,项目名称:mediacore,代码行数:7,代码来源:comments.py
示例8: test_single_query
def test_single_query(self):
search = Search(self.Donkey, "people", self.session)
session = self.Donkey.Session()
people_class = self.Donkey.get_class("people")
email_class = self.Donkey.get_class("email")
assert set(QueryFromStringParam(search, 'name < ?', pos_args = ["popp02"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).filter(people_class.name < u"popp02").all())) == set()
assert set(QueryFromStringParam(search, 'name < ? and email.email like ?', pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).join(["email"]).filter(and_(people_class.name < u"popp02", email_class.email.like(u"popi%"))).all())) == set()
assert set(QueryFromStringParam(search, "name < ? and not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(and_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()
assert set(QueryFromStringParam(search, "name < ? or not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()
assert set(QueryFromStringParam(search, "not (name < ? or not email.email like ?) ", pos_args = ["popp02", "popi%"]
).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(not_(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%")))))).all())) == set()
开发者ID:kindly,项目名称:reformed,代码行数:28,代码来源:search_param_test.py
示例9: get_measures
def get_measures(self):
"""Find all data that should be included in the report.
The data is returned as a list of tuples containing a
:py:class:`Module <euphorie.client.model.Module>`,
:py:class:`Risk <euphorie.client.model.Risk>` and
:py:class:`ActionPlan <euphorie.client.model.ActionPlan>`. Each
entry in the list will correspond to a row in the generated Excel
file.
This implementation differs from Euphorie in its ordering:
it sorts on risk priority instead of start date.
"""
query = (
Session.query(model.Module, model.Risk, model.ActionPlan)
.filter(sql.and_(model.Module.session == self.session, model.Module.profile_index > -1))
.filter(sql.not_(model.SKIPPED_PARENTS))
.filter(sql.or_(model.MODULE_WITH_RISK_OR_TOP5_FILTER, model.RISK_PRESENT_OR_TOP5_FILTER))
.join(
(
model.Risk,
sql.and_(
model.Risk.path.startswith(model.Module.path),
model.Risk.depth == model.Module.depth + 1,
model.Risk.session == self.session,
),
)
)
.join((model.ActionPlan, model.ActionPlan.risk_id == model.Risk.id))
.order_by(sql.case(value=model.Risk.priority, whens={"high": 0, "medium": 1}, else_=2), model.Risk.path)
)
return query.all()
开发者ID:pombredanne,项目名称:osha.oira,代码行数:32,代码来源:report.py
示例10: test_negate
def test_negate(self):
sat = self.sa_alltypes
sd = sat.c.double_col
d = self.alltypes.double_col
cases = [(-(d > 0), sql.not_(sd > L(0)))]
self._check_expr_cases(cases)
开发者ID:thekingofhero,项目名称:ibis,代码行数:7,代码来源:test_sqlalchemy.py
示例11: edit
def edit(self, *args, **kw):
user = handler.user.get_user_in_session(request)
if request.method == 'GET':
project_id = args[0]
else:
project_id = kw.get('pid')
debug("check permission", 1)
if not checker.check_permission(user=user, project_id=project_id, right_id=constants.right_upload_id) and not checker.is_admin(user=user):
flash('You must have %s permission to edit the project.' % constants.right_upload, 'error')
raise redirect('/tracks/', {'pid': project_id})
#if checker.is_admin(user=user):
#user = DBSession.query(User).join(Project).filter(Project.id == project_id).first()
widget = form.EditProject(action=url('/projects/edit/%s' % project_id)).req()
widget.value = {'pid': project_id}
project = DBSession.query(Project).filter(Project.id == project_id).first()
# prendre les user tracks du meme sequence id
tracks = DBSession.query(Track).join(User.tracks).filter(
and_(User.id == user.id, Track.sequence_id == project.sequence_id,
not_(Track.id.in_([t.id for t in project.tracks])))
).all()
# prendre les sared tracks du meme sequence id
shared_tracks = handler.user.shared_tracks(user.id, constants.rights['download']['id'])
shared_tracks = [t for t in shared_tracks if (t.sequence_id == project.sequence_id and t.id not in [tr.id for tr in project.tracks])]
tracks.extend(shared_tracks)
if request.method == 'GET':
debug("GET", 2)
widget.child.children[1].value = project.name
widget.child.children[2].options = [('', '')] + [(t.id, t.name) for t in tracks] + [(t.id, t.name, {'selected': True}) for t in project.tracks]
return dict(page='tracks', widget=widget, project_id=project_id)
debug("POST", 2)
try:
debug("validate post", 2)
widget.validate(kw)
except twc.ValidationError as e:
debug("error", 2)
w = e.widget
w.child.children[1].value = project.name
w.child.children[2].options = [(t.id, t.name) for t in tracks] + [(t.id, t.name, {'selected': True}) for t in project.tracks]
return dict(page='tracks', widget=w, project_id=project_id)
debug("validation passed")
track_ids = kw.get('tracks', [])
if not track_ids:
track_ids = []
if not isinstance(track_ids, list):
track_ids = [track_ids]
if len(track_ids) > 0 and '' in track_ids:
track_ids.remove('')
# if the project is shared, some track cannot be removed
for t in project.tracks:
if not checker.user_own_track(user.id, track=t) and t.id not in track_ids and t.id in [s.id for s in shared_tracks]:
track_ids.append(t.id)
handler.project.e(project_id=project_id, name=kw.get('name'), track_ids=track_ids)
raise redirect('/tracks/', {'pid': project_id})
开发者ID:bbcf,项目名称:pygdv,代码行数:60,代码来源:project.py
示例12: initialize
def initialize(app):
# Initialize the sources
for source_name, source_details in app.config.get('REVERE_SOURCES', {}).items():
if source_details.get('enabled') is False:
continue
app.sources[source_name] = get_klass(source_details['type'])(description=source_details.get('description'),
config=source_details['config'])
# Initialize the alerts
for alert_name, alert_details in app.config.get('REVERE_ALERTS', {}).items():
app.alerts[alert_name] = get_klass(alert_details['type'])(description=alert_details.get('description'),
config=alert_details['config'],
enabled_in_config=alert_details.get('enabled', True))
alert = Alert.query.filter_by(key=alert_name).first()
if not alert:
alert = Alert(key=alert_name)
db.session.add(alert)
Alert.query.filter(not_(Alert.key.in_(app.alerts.keys()))).delete(synchronize_session='fetch')
db.session.commit()
# Run the maintenance routine hourly
scheduler.add_cron_job(monitor_maintenance, year="*", month="*", day="*", hour="*", minute="0")
for monitor in Monitor.query.filter_by(active=True):
update_monitor_scheduler(monitor)
scheduler.start()
开发者ID:pegler,项目名称:revere,代码行数:28,代码来源:__init__.py
示例13: run
def run(self):
while not self.stoprequest.isSet():
self.db.expire_all()
for node_db in self.db.query(Node).filter(
# nodes may become unresponsive while provisioning
not_(Node.status == 'provisioning')
):
timedelta = (datetime.now() - node_db.timestamp).seconds
if timedelta > self.timeout:
logger.warning(
u"Node '{0}' seems to be offline "
"for {1} seconds...".format(
node_db.name,
timedelta
)
)
if node_db.online:
node_db.online = False
self.db.add(node_db)
self.db.commit()
notifier.notify(
"error",
u"Node '{0}' has gone away".format(
node_db.name or node_db.mac
),
node_id=node_db.id
)
self.sleep()
开发者ID:akolinko,项目名称:product,代码行数:28,代码来源:watcher.py
示例14: visit_conditional_insert
def visit_conditional_insert(element, compiler, **kwargs):
# magic copied from sqlalchemy.sql.compiler.SQLCompiler.visit_insert
compiler.isinsert = True
try:
# pylint: disable=E0611
from sqlalchemy.sql import crud
colparams = crud._get_crud_params(compiler, element)
except ImportError: # SQLAlchemy <= 1.0
colparams = compiler._get_colparams(element)
text = 'INSERT INTO %s' % compiler.process(element.table, asfrom=True)
text += ' (%s)\n' % ', '.join(compiler.preparer.format_column(c[0])
for c in colparams)
text += 'SELECT %s\n' % ', '.join(c[1] for c in colparams)
text += compiler.default_from()
# default_from() returns '' for MySQL but that's wrong, MySQL requires
# FROM DUAL if there is a following WHERE clause.
if isinstance(compiler.dialect, MySQLDialect):
text += 'FROM DUAL\n'
# We need FOR UPDATE in the inner SELECT for MySQL, to ensure we acquire an
# exclusive lock immediately, instead of acquiring a shared lock and then
# subsequently upgrading it to an exclusive lock, which is subject to
# deadlocks if another transaction is doing the same thing.
nonexistence_clause = not_(exists(Select(
columns=[sqltext('1')], from_obj=[element.table],
whereclause=element.unique_condition, for_update=True)))
text += 'WHERE ' + compiler.process(nonexistence_clause)
return text
开发者ID:beaker-project,项目名称:beaker,代码行数:27,代码来源:sql.py
示例15: author_shared_with
def author_shared_with(cls, author, target):
return and_(
Post.author_id == author.id,
Share.contact_id == target.contact.id,
not_(Share.hidden),
Post.parent_id == None
)
开发者ID:Zauberstuhl,项目名称:pyaspora,代码行数:7,代码来源:models.py
示例16: public_wall_for_contact
def public_wall_for_contact(cls, contact):
return and_(
Share.contact_id == contact.id,
Share.public,
not_(Share.hidden),
Post.parent_id == None
)
开发者ID:Zauberstuhl,项目名称:pyaspora,代码行数:7,代码来源:models.py
示例17: exclude
def exclude(self, **terms):
q = self._clone()
query = not_(self._filter(**terms))
if self.query is not None:
query = and_(self.query, query)
q.query = query
return q
开发者ID:Jc2k,项目名称:txsqlalchemy,代码行数:7,代码来源:query.py
示例18: get_schulen_all
def get_schulen_all():
""" liefert alle Schulen """
query = session.query(Schulstelle).add_entity(Schulstamm).join('rel_schulstamm')
# Studienseminare etc. ausschliessen
query = query.order_by(Schulstamm.NameSchule).filter(not_(Schulstamm.Schulamt==u'LH'))
query = query.reset_joinpoint().filter_by(Standort_Kz=0).filter_by(Loesch_Datum='')
return query.all()
开发者ID:shagun30,项目名称:djambala-2,代码行数:7,代码来源:sync_schulen_dms.py
示例19: get_pixbuf
def get_pixbuf (self, attr,val):
if attr=='category':
tbl = self.rd.recipe_table.join(self.rd.categories_table)
col = self.rd.categories_table.c.category
if hasattr(self,'category_images'):
stment = and_(col==val,self.rd.recipe_table.c.image!=None,
self.rd.recipe_table.c.image!='',
not_(self.rd.recipe_table.c.title.in_(self.category_images))
)
else:
stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
result = tbl.select(stment,limit=1).execute().fetchone()
if not hasattr(self,'category_images'): self.category_images = []
if result: self.category_images.append(result.title)
elif attr=='rating':
return star_generator.get_pixbuf(val)
elif attr in ['preptime','cooktime']:
return get_time_slice(val)
else:
tbl = self.rd.recipe_table
col = getattr(self.rd.recipe_table.c,attr)
stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
result = tbl.select(stment,limit=1).execute().fetchone()
if result and result.thumb:
return scale_pb(get_pixbuf_from_jpg(result.image))
else:
return self.get_base_icon(attr) or self.get_base_icon('category')
开发者ID:Bercio,项目名称:gourmet,代码行数:27,代码来源:browser.py
示例20: _get_ips_except_admin
def _get_ips_except_admin(self, node_id=None, network_id=None):
"""
Method for receiving IP addresses for node or network
excluding Admin Network IP address.
:param node_id: Node database ID.
:type node_id: int
:param network_id: Network database ID.
:type network_id: int
:returns: List of free IP addresses as SQLAlchemy objects.
"""
node_db = db().query(Node).get(node_id)
ips = db().query(IPAddr).order_by(IPAddr.id)
if node_id:
ips = ips.filter_by(node=node_id)
if network_id:
ips = ips.filter_by(network=network_id)
admin_net_id = self.get_admin_network_id(False)
if admin_net_id:
ips = ips.filter(
not_(IPAddr.network == admin_net_id)
)
return ips.all()
开发者ID:mahak,项目名称:fuelweb,代码行数:25,代码来源:manager.py
注:本文中的sqlalchemy.sql.not_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论