• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.not_函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.not_函数的典型用法代码示例。如果您正苦于以下问题:Python not_函数的具体用法?Python not_怎么用?Python not_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了not_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_schultyp_all

def get_schultyp_all():
  """ liefert alle Schultraeger """
  return session.query(KeySchultyp).\
         filter(and_(not_(KeySchultyp.Gruppe=='HKM'),
                     not_(KeySchultyp.Gruppe=='SONS'),
                     not_(KeySchultyp.Gruppe=='ST'))).\
         order_by(KeySchultyp.TextKey)
开发者ID:shagun30,项目名称:djambala-2,代码行数:7,代码来源:queries.py


示例2: get_schema_names

 def get_schema_names(self, connection, **kw):
     sysschema = self.sys_schemas
     query = sql.select([sysschema.c.schemaname],
                        sql.not_(sysschema.c.schemaname.like('SYS%')),
                        sql.not_(sysschema.c.schemaname.like('Q%')),
                        order_by=[sysschema.c.schemaname])
     return [self.normalize_name(r[0]) for r in connection.execute(query)]
开发者ID:rredburn,项目名称:python-ibmdbsa,代码行数:7,代码来源:reflection.py


示例3: tasks

 def tasks(self):
     """
     List of tasks that support this distro
     """
     # Delayed import to avoid circular dependency
     from . import Task, TaskExcludeArch, TaskExcludeOSMajor
     return Task.query\
             .filter(not_(Task.excluded_arch.any(
                 TaskExcludeArch.arch == self.arch)))\
             .filter(not_(Task.excluded_osmajor.any(
                 TaskExcludeOSMajor.osmajor == self.distro.osversion.osmajor)))
开发者ID:ustbgaofan,项目名称:beaker,代码行数:11,代码来源:distrolibrary.py


示例4: myfavorite

def myfavorite(Session, msmgroup):
    """ An example """
    logger.info("Running myfavorite")

    def activation_response(x, k):
        """ Curve from [0,1] -> [0,1] """
        if k < 0:
            k = k / (1 - k)
        return x / (1 - k * (x - 1))

    # =============#
    k_factor = -100
    # =============#
    prev = Session.query(MSMGroup).get(msmgroup.id - 1)
    if prev is None:
        return default(Session, msmgroup)

    # number of new states discovered
    n_new = msmgroup.n_states - prev.n_states
    if n_new < 0:
        return default(Session, msmgroup)

    q = Session.query(Trajectory)
    new_trajectories = q.filter(Trajectory.msm_groups.contains(msmgroup)).filter(
        not_(Trajectory.msm_groups.contains(prev))
    )

    # sum of the number of steps in the new trajectories
    sum_op = Session.query(func.sum(Trajectory.length))
    sum_op = sum_op.filter(Trajectory.msm_groups.contains(msmgroup))
    sum_op = sum_op.filter(not_(Trajectory.msm_groups.contains(prev)))
    n_steps = float(sum_op.scalar())

    p_explore = activation_response(n_new / n_steps, k_factor)

    if len(msmgroup.markov_models) != 2:
        raise ValueError("Only 2 models")
    if not [False, True] == sorted([msm.forcefield.true_kinetics for msm in msmgroup.markov_models]):
        raise ValueError("one needs to be true_kinetics, the other not")

    for msm in msmgroup.markov_models:
        if msm.forcefield.true_kinetics:
            msm.model_selection_weight = 1 - p_explore
            even_sampling(Session, msm)
        else:
            msm.model_selection_weight = p_explore
            even_sampling(Session, msm)

        logger.info("%s selection weight: %f", msm.forcefield.name, msm.model_selection_weight)
开发者ID:pandegroup,项目名称:msmaccelerator,代码行数:49,代码来源:sampling.py


示例5: test_filter_ne_nul_nul

    def test_filter_ne_nul_nul(self):
        Keyword = self.classes.Keyword

        self._equivalent(self.session.query(Keyword).filter(Keyword.user
                         != self.u),
                         self.session.query(Keyword).
                         filter(not_(Keyword.user_keyword.has(user=self.u))))
开发者ID:afeide,项目名称:LuoYunCloud,代码行数:7,代码来源:test_associationproxy.py


示例6: filterQueryByDateRestrictor

def filterQueryByDateRestrictor(query, dateRestrictor, tableName):
    """Returns the query filtered by the date restrictor, e.g., 'date elicited
    is earlier than 2011-11-11'.

    """

    location = dateRestrictor['location']

    relation = dateRestrictor['relation']

    date = dateRestrictor['date']
    date = datetime.datetime.combine(date, datetime.time())

    tbl = getattr(model, tableName)
    col = getattr(tbl, location)

    if relation == '' or relation == 'not_':
        nextDay = date + datetime.timedelta(1)
        previousDay = date - datetime.timedelta(1)

        if relation == '':
            filterCondition = and_(col > previousDay, col < nextDay)
        else:
            filterCondition = not_(and_(col > previousDay, col < nextDay))

    elif relation == 'earlier_than':
        filterCondition = col < date

    else:
        filterCondition = col > date

    return query.filter(filterCondition)
开发者ID:jrwdunham,项目名称:old-webapp,代码行数:32,代码来源:queryBuilder.py


示例7: trash

 def trash(self, flag=True):
     filter = sql.and_(Comment.reviewed == True,
                       Comment.publishable == False)
     if flag:
         return self.filter(filter)
     else:
         return self.filter(sql.not_(filter))
开发者ID:RadioErewan,项目名称:mediacore,代码行数:7,代码来源:comments.py


示例8: test_single_query

    def test_single_query(self):

        search = Search(self.Donkey, "people", self.session)

        session = self.Donkey.Session()

        people_class = self.Donkey.get_class("people")
        email_class = self.Donkey.get_class("email")

        assert set(QueryFromStringParam(search, 'name < ?', pos_args = ["popp02"]).add_conditions(base_query).all()).symmetric_difference(
               set(session.query(people_class.id).filter(people_class.name < u"popp02").all())) == set()


        assert set(QueryFromStringParam(search, 'name < ? and email.email like ?', pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
               set(session.query(people_class.id).join(["email"]).filter(and_(people_class.name < u"popp02", email_class.email.like(u"popi%"))).all())) == set()

        assert set(QueryFromStringParam(search, "name < ? and not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
               set(session.query(people_class.id).outerjoin(["email"]).\
                   filter(and_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()

        assert set(QueryFromStringParam(search, "name < ? or not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
               set(session.query(people_class.id).outerjoin(["email"]).\
                   filter(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()

        assert set(QueryFromStringParam(search, "not (name < ? or not email.email like ?) ", pos_args = ["popp02", "popi%"]
                                  ).add_conditions(base_query).all()).symmetric_difference(
               set(session.query(people_class.id).outerjoin(["email"]).\
                   filter(not_(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%")))))).all())) == set()
开发者ID:kindly,项目名称:reformed,代码行数:28,代码来源:search_param_test.py


示例9: get_measures

    def get_measures(self):
        """Find all data that should be included in the report.

        The data is returned as a list of tuples containing a
        :py:class:`Module <euphorie.client.model.Module>`,
        :py:class:`Risk <euphorie.client.model.Risk>` and
        :py:class:`ActionPlan <euphorie.client.model.ActionPlan>`. Each
        entry in the list will correspond to a row in the generated Excel
        file.

        This implementation differs from Euphorie in its ordering:
        it sorts on risk priority instead of start date.
        """
        query = (
            Session.query(model.Module, model.Risk, model.ActionPlan)
            .filter(sql.and_(model.Module.session == self.session, model.Module.profile_index > -1))
            .filter(sql.not_(model.SKIPPED_PARENTS))
            .filter(sql.or_(model.MODULE_WITH_RISK_OR_TOP5_FILTER, model.RISK_PRESENT_OR_TOP5_FILTER))
            .join(
                (
                    model.Risk,
                    sql.and_(
                        model.Risk.path.startswith(model.Module.path),
                        model.Risk.depth == model.Module.depth + 1,
                        model.Risk.session == self.session,
                    ),
                )
            )
            .join((model.ActionPlan, model.ActionPlan.risk_id == model.Risk.id))
            .order_by(sql.case(value=model.Risk.priority, whens={"high": 0, "medium": 1}, else_=2), model.Risk.path)
        )
        return query.all()
开发者ID:pombredanne,项目名称:osha.oira,代码行数:32,代码来源:report.py


示例10: test_negate

    def test_negate(self):
        sat = self.sa_alltypes
        sd = sat.c.double_col
        d = self.alltypes.double_col
        cases = [(-(d > 0), sql.not_(sd > L(0)))]

        self._check_expr_cases(cases)
开发者ID:thekingofhero,项目名称:ibis,代码行数:7,代码来源:test_sqlalchemy.py


示例11: edit

    def edit(self, *args, **kw):
        user = handler.user.get_user_in_session(request)
        if request.method == 'GET':
            project_id = args[0]
        else:
            project_id = kw.get('pid')
        debug("check permission", 1)
        if not checker.check_permission(user=user, project_id=project_id, right_id=constants.right_upload_id) and not checker.is_admin(user=user):
            flash('You must have %s permission to edit the project.' % constants.right_upload, 'error')
            raise redirect('/tracks/', {'pid': project_id})
        #if checker.is_admin(user=user):
            #user = DBSession.query(User).join(Project).filter(Project.id == project_id).first()

        widget = form.EditProject(action=url('/projects/edit/%s' % project_id)).req()
        widget.value = {'pid': project_id}
        project = DBSession.query(Project).filter(Project.id == project_id).first()

        # prendre les user tracks du meme sequence id
        tracks = DBSession.query(Track).join(User.tracks).filter(
            and_(User.id == user.id, Track.sequence_id == project.sequence_id,
                not_(Track.id.in_([t.id for t in project.tracks])))
        ).all()

        # prendre les sared tracks du meme sequence id
        shared_tracks = handler.user.shared_tracks(user.id, constants.rights['download']['id'])
        shared_tracks = [t for t in shared_tracks if (t.sequence_id == project.sequence_id and t.id not in [tr.id for tr in project.tracks])]

        tracks.extend(shared_tracks)

        if request.method == 'GET':
            debug("GET", 2)
            widget.child.children[1].value = project.name
            widget.child.children[2].options = [('', '')] + [(t.id, t.name) for t in tracks] + [(t.id, t.name, {'selected': True}) for t in project.tracks]
            return dict(page='tracks', widget=widget, project_id=project_id)
        debug("POST", 2)
        try:
            debug("validate post", 2)
            widget.validate(kw)
        except twc.ValidationError as e:
            debug("error", 2)
            w = e.widget
            w.child.children[1].value = project.name
            w.child.children[2].options = [(t.id, t.name) for t in tracks] + [(t.id, t.name, {'selected': True}) for t in project.tracks]
            return dict(page='tracks', widget=w, project_id=project_id)
        debug("validation passed")
        track_ids = kw.get('tracks', [])
        if not track_ids:
            track_ids = []
        if not isinstance(track_ids, list):
            track_ids = [track_ids]
        if len(track_ids) > 0 and '' in track_ids:
            track_ids.remove('')

        # if the project is shared, some track cannot be removed
        for t in project.tracks:
            if not checker.user_own_track(user.id, track=t) and t.id not in track_ids and t.id in [s.id for s in shared_tracks]:
                track_ids.append(t.id)

        handler.project.e(project_id=project_id, name=kw.get('name'), track_ids=track_ids)
        raise redirect('/tracks/', {'pid': project_id})
开发者ID:bbcf,项目名称:pygdv,代码行数:60,代码来源:project.py


示例12: initialize

def initialize(app):
    # Initialize the sources
    for source_name, source_details in app.config.get('REVERE_SOURCES', {}).items():
        if source_details.get('enabled') is False:
            continue
        app.sources[source_name] = get_klass(source_details['type'])(description=source_details.get('description'),
                                                                 config=source_details['config'])

    # Initialize the alerts
    for alert_name, alert_details in app.config.get('REVERE_ALERTS', {}).items():
        app.alerts[alert_name] = get_klass(alert_details['type'])(description=alert_details.get('description'),
                                                                  config=alert_details['config'],
                                                                  enabled_in_config=alert_details.get('enabled', True))
        alert = Alert.query.filter_by(key=alert_name).first()
        if not alert:
            alert = Alert(key=alert_name)
            db.session.add(alert)

    Alert.query.filter(not_(Alert.key.in_(app.alerts.keys()))).delete(synchronize_session='fetch')
    db.session.commit()

    # Run the maintenance routine hourly
    scheduler.add_cron_job(monitor_maintenance, year="*", month="*", day="*", hour="*", minute="0")

    for monitor in Monitor.query.filter_by(active=True):
        update_monitor_scheduler(monitor)

    scheduler.start()
开发者ID:pegler,项目名称:revere,代码行数:28,代码来源:__init__.py


示例13: run

 def run(self):
     while not self.stoprequest.isSet():
         self.db.expire_all()
         for node_db in self.db.query(Node).filter(
             # nodes may become unresponsive while provisioning
             not_(Node.status == 'provisioning')
         ):
             timedelta = (datetime.now() - node_db.timestamp).seconds
             if timedelta > self.timeout:
                 logger.warning(
                     u"Node '{0}' seems to be offline "
                     "for {1} seconds...".format(
                         node_db.name,
                         timedelta
                     )
                 )
                 if node_db.online:
                     node_db.online = False
                     self.db.add(node_db)
                     self.db.commit()
                     notifier.notify(
                         "error",
                         u"Node '{0}' has gone away".format(
                             node_db.name or node_db.mac
                         ),
                         node_id=node_db.id
                     )
         self.sleep()
开发者ID:akolinko,项目名称:product,代码行数:28,代码来源:watcher.py


示例14: visit_conditional_insert

def visit_conditional_insert(element, compiler, **kwargs):
    # magic copied from sqlalchemy.sql.compiler.SQLCompiler.visit_insert
    compiler.isinsert = True
    try:
        # pylint: disable=E0611
        from sqlalchemy.sql import crud
        colparams = crud._get_crud_params(compiler, element)
    except ImportError:  # SQLAlchemy <= 1.0
        colparams = compiler._get_colparams(element)
    text = 'INSERT INTO %s' % compiler.process(element.table, asfrom=True)
    text += ' (%s)\n' % ', '.join(compiler.preparer.format_column(c[0])
            for c in colparams)
    text += 'SELECT %s\n' % ', '.join(c[1] for c in colparams)
    text += compiler.default_from()
    # default_from() returns '' for MySQL but that's wrong, MySQL requires 
    # FROM DUAL if there is a following WHERE clause.
    if isinstance(compiler.dialect, MySQLDialect):
        text += 'FROM DUAL\n'
    # We need FOR UPDATE in the inner SELECT for MySQL, to ensure we acquire an 
    # exclusive lock immediately, instead of acquiring a shared lock and then 
    # subsequently upgrading it to an exclusive lock, which is subject to 
    # deadlocks if another transaction is doing the same thing.
    nonexistence_clause = not_(exists(Select(
            columns=[sqltext('1')], from_obj=[element.table],
            whereclause=element.unique_condition, for_update=True)))
    text += 'WHERE ' + compiler.process(nonexistence_clause)
    return text
开发者ID:beaker-project,项目名称:beaker,代码行数:27,代码来源:sql.py


示例15: author_shared_with

 def author_shared_with(cls, author, target):
     return and_(
         Post.author_id == author.id,
         Share.contact_id == target.contact.id,
         not_(Share.hidden),
         Post.parent_id == None
     )
开发者ID:Zauberstuhl,项目名称:pyaspora,代码行数:7,代码来源:models.py


示例16: public_wall_for_contact

 def public_wall_for_contact(cls, contact):
     return and_(
         Share.contact_id == contact.id,
         Share.public,
         not_(Share.hidden),
         Post.parent_id == None
     )
开发者ID:Zauberstuhl,项目名称:pyaspora,代码行数:7,代码来源:models.py


示例17: exclude

 def exclude(self, **terms):
     q = self._clone()
     query = not_(self._filter(**terms))
     if self.query is not None:
         query = and_(self.query, query)
     q.query = query
     return q
开发者ID:Jc2k,项目名称:txsqlalchemy,代码行数:7,代码来源:query.py


示例18: get_schulen_all

def get_schulen_all():
  """ liefert alle Schulen """
  query = session.query(Schulstelle).add_entity(Schulstamm).join('rel_schulstamm')
  # Studienseminare etc. ausschliessen
  query = query.order_by(Schulstamm.NameSchule).filter(not_(Schulstamm.Schulamt==u'LH'))
  query = query.reset_joinpoint().filter_by(Standort_Kz=0).filter_by(Loesch_Datum='')
  return query.all()
开发者ID:shagun30,项目名称:djambala-2,代码行数:7,代码来源:sync_schulen_dms.py


示例19: get_pixbuf

 def get_pixbuf (self, attr,val):
     if attr=='category':            
         tbl = self.rd.recipe_table.join(self.rd.categories_table)
         col = self.rd.categories_table.c.category
         if hasattr(self,'category_images'):
             stment = and_(col==val,self.rd.recipe_table.c.image!=None,
                           self.rd.recipe_table.c.image!='',
                           not_(self.rd.recipe_table.c.title.in_(self.category_images))
                           )
         else:
             stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
         result = tbl.select(stment,limit=1).execute().fetchone()
         if not hasattr(self,'category_images'): self.category_images = []
         if result: self.category_images.append(result.title)
     elif attr=='rating':
         return star_generator.get_pixbuf(val)
     elif attr in ['preptime','cooktime']:
         return get_time_slice(val)
     else:
         tbl = self.rd.recipe_table
         col = getattr(self.rd.recipe_table.c,attr)
         stment = and_(col==val,self.rd.recipe_table.c.image!=None,self.rd.recipe_table.c.image!='')
         result = tbl.select(stment,limit=1).execute().fetchone()
     if result and result.thumb:
         return scale_pb(get_pixbuf_from_jpg(result.image))
     else:
         return self.get_base_icon(attr) or self.get_base_icon('category')
开发者ID:Bercio,项目名称:gourmet,代码行数:27,代码来源:browser.py


示例20: _get_ips_except_admin

    def _get_ips_except_admin(self, node_id=None, network_id=None):
        """
        Method for receiving IP addresses for node or network
        excluding Admin Network IP address.

        :param node_id: Node database ID.
        :type  node_id: int
        :param network_id: Network database ID.
        :type  network_id: int
        :returns: List of free IP addresses as SQLAlchemy objects.
        """
        node_db = db().query(Node).get(node_id)
        ips = db().query(IPAddr).order_by(IPAddr.id)
        if node_id:
            ips = ips.filter_by(node=node_id)
        if network_id:
            ips = ips.filter_by(network=network_id)

        admin_net_id = self.get_admin_network_id(False)
        if admin_net_id:
            ips = ips.filter(
                not_(IPAddr.network == admin_net_id)
            )

        return ips.all()
开发者ID:mahak,项目名称:fuelweb,代码行数:25,代码来源:manager.py



注:本文中的sqlalchemy.sql.not_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.null函数代码示例发布时间:2022-05-27
下一篇:
Python sql.literal_column函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap