• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.false函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.false函数的典型用法代码示例。如果您正苦于以下问题:Python false函数的具体用法?Python false怎么用?Python false使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了false函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: index

    def index(self, trans, **kwd):
        """
        GET /api/workflows

        Displays a collection of workflows.

        :param  show_published:      if True, show also published workflows
        :type   show_published:      boolean
        """
        show_published = util.string_as_bool( kwd.get( 'show_published', 'False' ) )
        rval = []
        filter1 = ( trans.app.model.StoredWorkflow.user == trans.user )
        if show_published:
            filter1 = or_( filter1, ( trans.app.model.StoredWorkflow.published == true() ) )
        for wf in trans.sa_session.query( trans.app.model.StoredWorkflow ).filter(
                filter1, trans.app.model.StoredWorkflow.table.c.deleted == false() ).order_by(
                desc( trans.app.model.StoredWorkflow.table.c.update_time ) ).all():
            item = wf.to_dict( value_mapper={ 'id': trans.security.encode_id } )
            encoded_id = trans.security.encode_id(wf.id)
            item['url'] = url_for('workflow', id=encoded_id)
            item['owner'] = wf.user.username
            rval.append(item)
        for wf_sa in trans.sa_session.query( trans.app.model.StoredWorkflowUserShareAssociation ).filter_by(
                user=trans.user ).join( 'stored_workflow' ).filter(
                trans.app.model.StoredWorkflow.deleted == false() ).order_by(
                desc( trans.app.model.StoredWorkflow.update_time ) ).all():
            item = wf_sa.stored_workflow.to_dict( value_mapper={ 'id': trans.security.encode_id } )
            encoded_id = trans.security.encode_id(wf_sa.stored_workflow.id)
            item['url'] = url_for( 'workflow', id=encoded_id )
            item['owner'] = wf_sa.stored_workflow.user.username
            rval.append(item)
        return rval
开发者ID:ashvark,项目名称:galaxy,代码行数:32,代码来源:workflows.py


示例2: get_users

def get_users(
    number: int=10,
    offset: int=0,
    matches_all: Dict[str, Any]={},
    matches_any: Dict[str, Any]={},
    store: Store=None,
) -> Tuple[List[User], int]:
    query = store.session.query(User)
    query = query.outerjoin(InstitutionAssociation)
    query = query.outerjoin(Institution)
    query = query.order_by(User.id.asc())

    searches = {
        'bio': lambda a: User.bio.ilike('%{}%'.format(a)),
        'created_after': lambda a: User.date_created > a,
        'created_before': lambda a: User.date_created < a,
        'institution': lambda a: Institution.name.ilike('%{}%'.format(a)),
        'name': lambda a: User.name.ilike('%{}%'.format(a)),
    }

    filter_all = true()
    filter_any = false()

    for name, value in matches_all.items():
        filter_all = filter_all & searches.get(name, lambda _: true())(value)

    for name, value in matches_any.items():
        filter_any = filter_any | searches.get(name, lambda _: false())(value)

    query = query.filter(filter_all & filter_any if filter_any is not false() else filter_all)
    query = query.distinct()
    count = query.count()
    query = query.limit(number).offset(offset)

    return query, count
开发者ID:communityshare,项目名称:communityshare,代码行数:35,代码来源:get_users.py


示例3: stats

def stats():
    """Show some devices stats."""
    sq_nt = session.query(Device.address) \
        .filter(and_(Device.tracked == false(), Device.identified == true())) \
        .subquery()

    sq_ni = session.query(Device.address) \
        .filter(and_(Device.tracked == true(), Device.identified == false())) \
        .subquery()

    sq_ntni = session.query(Device.address) \
        .filter(and_(Device.tracked == false(), Device.identified == false())) \
        .subquery()

    query = session.query(Device.address_origin, func.count(Device.id), func.count(sq_nt.c.address), func.count(sq_ni.c.address), func.count(sq_ntni.c.address)) \
        .outerjoin(sq_nt, sq_nt.c.address == Device.address) \
        .outerjoin(sq_ni, sq_ni.c.address == Device.address) \
        .outerjoin(sq_ntni, sq_ntni.c.address == Device.address) \
        .group_by(Device.address_origin)

    print('--- Devices ---')
    for [address_origin, device_count, nt_count, ni_count, ntni_count] in query.all():
        print('{:12s} Total:{:5d} - not tracked:{:3d}, not identified:{:3d}, not tracked & not identified: {:3d}'
              .format(AddressOrigin(address_origin).name,
                      device_count,
                      nt_count,
                      ni_count,
                      ntni_count))
开发者ID:Turbo87,项目名称:ogn-python,代码行数:28,代码来源:database.py


示例4: get_permissions_query

  def get_permissions_query(model_names, permission_type='read'):
    """Prepare the query based on the allowed resources

    This filters for each of the required models based on permissions on every
    object type.
    """
    if not model_names:
      # If there are no model names set, the result of the permissions query
      # will always be false, so we can just return false instead of having an
      # empty in statement combined with an empty list joined by or statement.
      return sa.false()

    type_queries = []
    for model_name in model_names:
      contexts, resources = permissions.get_context_resource(
          model_name=model_name,
          permission_type=permission_type,
      )

      if contexts is None:
        # None context means user has full access of permission_type for the
        # given model
        type_queries.append(MysqlRecordProperty.type == model_name)
      elif resources:
        type_queries.append(sa.and_(
            MysqlRecordProperty.type == model_name,
            MysqlRecordProperty.key.in_(resources),
        ))

    if not type_queries:
      return sa.false()

    return sa.or_(*type_queries)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:33,代码来源:mysql.py


示例5: list

    def list(self, trans, deleted=False):
        """
        Return a list of libraries from the DB.

        :param  deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
        :type   deleted: boolean (optional)

        :returns: query that will emit all accessible libraries
        :rtype:   sqlalchemy query
        :returns: dict of 3 sets with available actions for user's accessible
                  libraries and a set of ids of all public libraries. These are
                  used for limiting the number of queries when dictifying the
                  libraries later on.
        :rtype:   dict
        """
        is_admin = trans.user_is_admin()
        query = trans.sa_session.query(trans.app.model.Library)
        library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
        restricted_library_ids = {lp.library_id for lp in (
            trans.sa_session.query(trans.model.LibraryPermissions).filter(
                trans.model.LibraryPermissions.table.c.action == library_access_action
            ).distinct())}
        prefetched_ids = {'restricted_library_ids': restricted_library_ids}
        if is_admin:
            if deleted is None:
                #  Flag is not specified, do not filter on it.
                pass
            elif deleted:
                query = query.filter(trans.app.model.Library.table.c.deleted == true())
            else:
                query = query.filter(trans.app.model.Library.table.c.deleted == false())
        else:
            #  Nonadmins can't see deleted libraries
            query = query.filter(trans.app.model.Library.table.c.deleted == false())
            current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
            all_actions = trans.sa_session.query(trans.model.LibraryPermissions).filter(trans.model.LibraryPermissions.table.c.role_id.in_(current_user_role_ids))
            library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action
            library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action
            library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action
            accessible_restricted_library_ids = set()
            allowed_library_add_ids = set()
            allowed_library_modify_ids = set()
            allowed_library_manage_ids = set()
            for action in all_actions:
                if action.action == library_access_action:
                    accessible_restricted_library_ids.add(action.library_id)
                if action.action == library_add_action:
                    allowed_library_add_ids.add(action.library_id)
                if action.action == library_modify_action:
                    allowed_library_modify_ids.add(action.library_id)
                if action.action == library_manage_action:
                    allowed_library_manage_ids.add(action.library_id)
            query = query.filter(or_(
                not_(trans.model.Library.table.c.id.in_(restricted_library_ids)),
                trans.model.Library.table.c.id.in_(accessible_restricted_library_ids)
            ))
            prefetched_ids['allowed_library_add_ids'] = allowed_library_add_ids
            prefetched_ids['allowed_library_modify_ids'] = allowed_library_modify_ids
            prefetched_ids['allowed_library_manage_ids'] = allowed_library_manage_ids
        return query, prefetched_ids
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:60,代码来源:libraries.py


示例6: build_initial_query

 def build_initial_query(self, trans, **kwd):
     return trans.sa_session.query(model.Repository) \
                            .filter(and_(model.Repository.table.c.deleted == false(),
                                         model.Repository.table.c.deprecated == false())) \
                            .join((model.RepositoryReview.table, model.RepositoryReview.table.c.repository_id == model.Repository.table.c.id)) \
                            .join((model.User.table, model.User.table.c.id == model.Repository.table.c.user_id)) \
                            .outerjoin((model.ComponentReview.table, model.ComponentReview.table.c.repository_review_id == model.RepositoryReview.table.c.id)) \
                            .outerjoin((model.Component.table, model.Component.table.c.id == model.ComponentReview.table.c.component_id))
开发者ID:lappsgrid-incubator,项目名称:Galaxy,代码行数:8,代码来源:repository_review_grids.py


示例7: handle_role_associations

def handle_role_associations( app, role, repository, **kwd ):
    sa_session = app.model.context.current
    message = escape( kwd.get( 'message', '' ) )
    status = kwd.get( 'status', 'done' )
    repository_owner = repository.user
    if kwd.get( 'manage_role_associations_button', False ):
        in_users_list = util.listify( kwd.get( 'in_users', [] ) )
        in_users = [ sa_session.query( app.model.User ).get( x ) for x in in_users_list ]
        # Make sure the repository owner is always associated with the repostory's admin role.
        owner_associated = False
        for user in in_users:
            if user.id == repository_owner.id:
                owner_associated = True
                break
        if not owner_associated:
            in_users.append( repository_owner )
            message += "The repository owner must always be associated with the repository's administrator role.  "
            status = 'error'
        in_groups_list = util.listify( kwd.get( 'in_groups', [] ) )
        in_groups = [ sa_session.query( app.model.Group ).get( x ) for x in in_groups_list ]
        in_repositories = [ repository ]
        app.security_agent.set_entity_role_associations( roles=[ role ],
                                                         users=in_users,
                                                         groups=in_groups,
                                                         repositories=in_repositories )
        sa_session.refresh( role )
        message += "Role <b>%s</b> has been associated with %d users, %d groups and %d repositories.  " % \
            ( escape( str( role.name ) ), len( in_users ), len( in_groups ), len( in_repositories ) )
    in_users = []
    out_users = []
    in_groups = []
    out_groups = []
    for user in sa_session.query( app.model.User ) \
                          .filter( app.model.User.table.c.deleted == false() ) \
                          .order_by( app.model.User.table.c.email ):
        if user in [ x.user for x in role.users ]:
            in_users.append( ( user.id, user.email ) )
        else:
            out_users.append( ( user.id, user.email ) )
    for group in sa_session.query( app.model.Group ) \
                           .filter( app.model.Group.table.c.deleted == false() ) \
                           .order_by( app.model.Group.table.c.name ):
        if group in [ x.group for x in role.groups ]:
            in_groups.append( ( group.id, group.name ) )
        else:
            out_groups.append( ( group.id, group.name ) )
    associations_dict = dict( in_users=in_users,
                              out_users=out_users,
                              in_groups=in_groups,
                              out_groups=out_groups,
                              message=message,
                              status=status )
    return associations_dict
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:53,代码来源:repository_util.py


示例8: test_is_boolean_symbols_despite_no_native

    def test_is_boolean_symbols_despite_no_native(self):
        is_(
            testing.db.scalar(select([cast(true().is_(true()), Boolean)])),
            True,
        )

        is_(
            testing.db.scalar(select([cast(true().isnot(true()), Boolean)])),
            False,
        )

        is_(
            testing.db.scalar(select([cast(false().is_(false()), Boolean)])),
            True,
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:15,代码来源:test_query.py


示例9: purge_libraries

def purge_libraries( app, cutoff_time, remove_from_disk, info_only=False, force_retry=False ):
    # Purges deleted libraries whose update_time is older than the cutoff_time.
    # The dataset associations of each library are also marked as deleted.
    # The Purge Dataset method will purge each Dataset as necessary
    # library.purged == True simply means that it can no longer be undeleted
    # i.e. all associated LibraryDatasets/folders are marked as deleted
    library_count = 0
    start = time.time()
    if force_retry:
        libraries = app.sa_session.query( app.model.Library ) \
                                  .filter( and_( app.model.Library.table.c.deleted == true(),
                                                 app.model.Library.table.c.update_time < cutoff_time ) )
    else:
        libraries = app.sa_session.query( app.model.Library ) \
                                  .filter( and_( app.model.Library.table.c.deleted == true(),
                                                 app.model.Library.table.c.purged == false(),
                                                 app.model.Library.table.c.update_time < cutoff_time ) )
    for library in libraries:
        _purge_folder( library.root_folder, app, remove_from_disk, info_only=info_only )
        if not info_only:
            print "Purging library id ", library.id
            library.purged = True
            app.sa_session.add( library )
            app.sa_session.flush()
        library_count += 1
    stop = time.time()
    print '# Purged %d libraries .' % library_count
    print "Elapsed time: ", stop - start
    print "##########################################"
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:29,代码来源:cleanup_datasets.py


示例10: index

 def index( self, trans, deleted='False', f_email=None, **kwd ):
     """
     GET /api/users
     GET /api/users/deleted
     Displays a collection (list) of users.
     """
     rval = []
     query = trans.sa_session.query( trans.app.model.User )
     deleted = util.string_as_bool( deleted )
     if f_email:
         query = query.filter(trans.app.model.User.email.like("%%%s%%" % f_email))
     if deleted:
         query = query.filter( trans.app.model.User.table.c.deleted == true() )
         # only admins can see deleted users
         if not trans.user_is_admin():
             return []
     else:
         query = query.filter( trans.app.model.User.table.c.deleted == false() )
         # special case: user can see only their own user
         # special case2: if the galaxy admin has specified that other user email/names are
         #   exposed, we don't want special case #1
         if not trans.user_is_admin() and not trans.app.config.expose_user_name and not trans.app.config.expose_user_email:
             item = trans.user.to_dict( value_mapper={ 'id': trans.security.encode_id } )
             return [item]
     for user in query:
         item = user.to_dict( value_mapper={ 'id': trans.security.encode_id } )
         # If NOT configured to expose_email, do not expose email UNLESS the user is self, or
         # the user is an admin
         if not trans.app.config.expose_user_name and user is not trans.user and not trans.user_is_admin():
             del item['username']
         if not trans.app.config.expose_user_email and user is not trans.user and not trans.user_is_admin():
             del item['email']
         # TODO: move into api_values
         rval.append( item )
     return rval
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:35,代码来源:users.py


示例11: get_library

def get_library( name, description, synopsis ):
    return gx_context().query( galaxy.model.Library ) \
                       .filter( and_( galaxy.model.Library.table.c.name == name,
                                      galaxy.model.Library.table.c.description == description,
                                      galaxy.model.Library.table.c.synopsis == synopsis,
                                      galaxy.model.Library.table.c.deleted == false() ) ) \
                       .first()
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:7,代码来源:test_db_util.py


示例12: deleted_histories

    def deleted_histories( self, trans, **kwd ):
        """
        The number of histories that were deleted more than the specified number of days ago, but have not yet been purged.
        Also included is the number of datasets associated with the histories.
        """
        params = util.Params( kwd )
        message = ''
        if params.deleted_histories_days:
            deleted_histories_days = int( params.deleted_histories_days )
            cutoff_time = datetime.utcnow() - timedelta( days=deleted_histories_days )
            history_count = 0
            dataset_count = 0
            disk_space = 0
            histories = trans.sa_session.query( model.History ) \
                .filter( and_( model.History.table.c.deleted == true(),
                    model.History.table.c.purged == false(),
                    model.History.table.c.update_time < cutoff_time ) ) \
                .options( eagerload( 'datasets' ) )

            for history in histories:
                for hda in history.datasets:
                    if not hda.dataset.purged:
                        dataset_count += 1
                        try:
                            disk_space += hda.dataset.file_size
                        except:
                            pass
                history_count += 1
            message = "%d histories ( including a total of %d datasets ) were deleted more than %d days ago, but have not yet been purged, " \
                "disk space: %s." % ( history_count, dataset_count, deleted_histories_days, nice_size( disk_space, True ) )
        else:
            message = "Enter the number of days."
        return str( deleted_histories_days ), message
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:33,代码来源:system.py


示例13: _get_tasks_in_cycle

  def _get_tasks_in_cycle(model):
    """Filter tasks with particular statuses and cycle.

    Filtering tasks with statuses "Assigned", "InProgress" and "Finished".
    Where the task is in current users cycle.
    """
    task_query = db.session.query(
        model.id.label('id'),
        literal(model.__name__).label('type'),
        literal(None).label('context_id'),
    ).join(
        Cycle,
        Cycle.id == model.cycle_id
    ).filter(
        Cycle.is_current == true(),
        model.contact_id == contact_id
    )
    return task_query.filter(
        Cycle.is_verification_needed == true(),
        model.status.in_([
            all_models.CycleTaskGroupObjectTask.ASSIGNED,
            all_models.CycleTaskGroupObjectTask.IN_PROGRESS,
            all_models.CycleTaskGroupObjectTask.FINISHED,
            all_models.CycleTaskGroupObjectTask.DECLINED,
        ])
    ).union_all(
        task_query.filter(
            Cycle.is_verification_needed == false(),
            model.status.in_([
                all_models.CycleTaskGroupObjectTask.ASSIGNED,
                all_models.CycleTaskGroupObjectTask.IN_PROGRESS,
            ])
        )
    )
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:34,代码来源:query_helpers.py


示例14: delete_userless_histories

def delete_userless_histories(app, cutoff_time, info_only=False, force_retry=False):
    # Deletes userless histories whose update_time value is older than the cutoff_time.
    # The purge history script will handle marking DatasetInstances as deleted.
    # Nothing is removed from disk yet.
    history_count = 0
    start = time.time()
    if force_retry:
        histories = app.sa_session.query(app.model.History) \
                                  .filter(and_(app.model.History.table.c.user_id == null(),
                                               app.model.History.table.c.update_time < cutoff_time))
    else:
        histories = app.sa_session.query(app.model.History) \
                                  .filter(and_(app.model.History.table.c.user_id == null(),
                                               app.model.History.table.c.deleted == false(),
                                               app.model.History.table.c.update_time < cutoff_time))
    for history in histories:
        if not info_only:
            print("Deleting history id ", history.id)
            history.deleted = True
            app.sa_session.add(history)
            app.sa_session.flush()
        history_count += 1
    stop = time.time()
    print("Deleted %d histories" % history_count)
    print("Elapsed time: ", stop - start)
    print("##########################################")
开发者ID:osallou,项目名称:galaxy,代码行数:26,代码来源:cleanup_datasets.py


示例15: upgrade

def upgrade():
    op.add_column('aim_l3outsides', sa.Column('bgp_enable', sa.Boolean(),
                                              server_default=sa.false(),
                                              nullable=False))
    op.add_column('aim_external_subnets',
                  sa.Column('aggregate', sa.String(64), server_default="",
                            nullable=False))
    op.add_column('aim_external_subnets',
                  sa.Column('scope', sa.String(64),
                            server_default="import-security", nullable=False))
    op.create_table(
        'aim_l3out_interface_bgp_peer_prefix',
        sa.Column('aim_id', sa.Integer, autoincrement=True),
        sa.Column('tenant_name', sa.String(64), nullable=False),
        sa.Column('l3out_name', sa.String(64), nullable=False),
        sa.Column('node_profile_name', sa.String(64), nullable=False),
        sa.Column('interface_profile_name', sa.String(64), nullable=False),
        sa.Column('interface_path', VARCHAR(512, charset='latin1'),
                  nullable=False),
        sa.Column('addr', sa.String(64), nullable=False),
        sa.Column('asn', sa.Integer),
        sa.Column('local_asn', sa.Integer),
        sa.Column('monitored', sa.Boolean, nullable=False, default=False),
        sa.PrimaryKeyConstraint('aim_id'),
        sa.UniqueConstraint('tenant_name', 'l3out_name', 'node_profile_name',
                            'interface_profile_name', 'interface_path',
                            'addr',
                            name='uniq_aim_l3out_interface_bgp_peer_pfx_id'),
        sa.Index('uniq_aim_l3out_interface_bgp_peer_pfx_idx', 'tenant_name',
                 'l3out_name', 'node_profile_name',
                 'interface_profile_name', 'interface_path', 'addr'))
开发者ID:noironetworks,项目名称:aci-integration-module,代码行数:31,代码来源:f18e545de625_bgp_peer_prefix.py


示例16: list

    def list( self, trans, *args, **kwargs ):
        """ List user's pages. """
        # Handle operation
        if 'operation' in kwargs and 'id' in kwargs:
            session = trans.sa_session
            operation = kwargs['operation'].lower()
            ids = util.listify( kwargs['id'] )
            for id in ids:
                item = session.query( model.Page ).get( self.decode_id( id ) )
                if operation == "delete":
                    item.deleted = True
                if operation == "share or publish":
                    return self.sharing( trans, **kwargs )
            session.flush()

        # HACK: to prevent the insertion of an entire html document inside another
        kwargs[ 'embedded' ] = True
        # Build grid HTML.
        grid = self._page_list( trans, *args, **kwargs )

        # Build list of pages shared with user.
        shared_by_others = trans.sa_session \
            .query( model.PageUserShareAssociation ) \
            .filter_by( user=trans.get_user() ) \
            .join( model.Page.table ) \
            .filter( model.Page.deleted == false() ) \
            .order_by( desc( model.Page.update_time ) ) \
            .all()

        # Render grid wrapped in panels
        return trans.fill_template( "page/index.mako", embedded_grid=grid, shared_by_others=shared_by_others )
开发者ID:AAFC-MBB,项目名称:galaxy-1,代码行数:31,代码来源:page.py


示例17: get_user_address

def get_user_address( user, short_desc ):
    return gx_context().query( galaxy.model.UserAddress ) \
                       .filter( and_( galaxy.model.UserAddress.table.c.user_id == user.id,
                                      galaxy.model.UserAddress.table.c.desc == short_desc,
                                      galaxy.model.UserAddress.table.c.deleted == false() ) ) \
                       .order_by( desc( galaxy.model.UserAddress.table.c.create_time ) ) \
                       .first()
开发者ID:galaxyguardians,项目名称:galaxy,代码行数:7,代码来源:test_db_util.py


示例18: determine_fetches

def determine_fetches(db_session, cred):
    for thread in db_session.query(Thread).filter_by(closed=False):
        update_thread_status(thread, cred)
    db_session.flush()
    incomplete_page_ids = (
        sa.select([ThreadPost.page_id])
        .group_by(ThreadPost.page_id)
        .having(sa.func.count(ThreadPost.id) < 40)
        .as_scalar()
    )
    incomplete_pages = sa.select(
        [ThreadPage.thread_id, ThreadPage.page_num], from_obj=sa.join(ThreadPage, Thread)
    ).where(sa.and_(ThreadPage.id.in_(incomplete_page_ids), Thread.closed == sa.false()))
    fetch_status = (
        sa.select(
            [ThreadPage.thread_id.label("thread_id"), sa.func.max(ThreadPage.page_num).label("last_fetched_page")]
        )
        .group_by(ThreadPage.thread_id)
        .alias("fetch_status")
    )
    unfetched_pages = sa.select(
        [
            Thread.id.label("thread_id"),
            sa.func.generate_series(fetch_status.c.last_fetched_page + 1, Thread.page_count).label("page_num"),
        ],
        from_obj=sa.join(Thread, fetch_status, Thread.id == fetch_status.c.thread_id),
    )
    fetched_first_pages = sa.select([ThreadPage.thread_id]).where(ThreadPage.page_num == 1).as_scalar()
    unfetched_first_pages = sa.select(
        [Thread.id.label("thread_id"), sa.literal(1, sa.Integer).label("page_num")], from_obj=Thread
    ).where(Thread.id.notin_(fetched_first_pages))
    q = sa.union(incomplete_pages, unfetched_pages, unfetched_first_pages)
    q = q.order_by(q.c.thread_id.asc(), q.c.page_num.asc())
    return db_session.execute(q).fetchall()
开发者ID:inklesspen,项目名称:mimir,代码行数:34,代码来源:fetch.py


示例19: _get_revision_type_query

  def _get_revision_type_query(model, permission_type):
    """Filter model based on availability of related objects.

    This method is used only when quering revisions. In such case only
    revisions of objects user has right permission on should be returned. It
    means, user must have either right permission on object revision belongs
    to or in case it is revision of a relationship, user must have right
    permission on at least one part of the relationship.
    """
    allowed_resources = permissions.all_resources(permission_type)
    if not allowed_resources:
      return sa.false()

    return sa.or_(
        sa.tuple_(
            model.resource_type,
            model.resource_id,
        ).in_(
            allowed_resources,
        ),
        sa.tuple_(
            model.source_type,
            model.source_id,
        ).in_(
            allowed_resources,
        ),
        sa.tuple_(
            model.destination_type,
            model.destination_id,
        ).in_(
            allowed_resources,
        ),
    )
开发者ID:google,项目名称:ggrc-core,代码行数:33,代码来源:builder.py


示例20: get_categories

def get_categories(app):
    """Get all categories from the database."""
    sa_session = app.model.context.current
    return sa_session.query(app.model.Category) \
                     .filter(app.model.Category.table.c.deleted == false()) \
                     .order_by(app.model.Category.table.c.name) \
                     .all()
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:7,代码来源:shed_util_common.py



注:本文中的sqlalchemy.false函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.func函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.extract函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap