• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.and_函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.and_函数的典型用法代码示例。如果您正苦于以下问题:Python and_函数的具体用法?Python and_怎么用?Python and_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了and_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: book_menu

def book_menu(novel_id):
    """小说目录"""
    if request.method == 'GET':
        ybd_chapter = db_session.query(Chapter.id, Chapter.name).filter(and_(Chapter.novel_id == novel_id,
                                                                             Chapter.content_source == 1)).all()

        bqg_chapter = db_session.query(Chapter.id, Chapter.name).filter(and_(Chapter.novel_id == novel_id,
                                                                             Chapter.content_source == 2)).all()

        novel = db_session.query(Novel).filter(Novel.id == novel_id).first()

        return render_template('book_menu.html', ybd_chapter=ybd_chapter, novel=novel, novel_id=novel_id,
                               bqg_chapter=bqg_chapter)

    elif request.method == 'POST':
        data = request.get_json()
        novel_text_path = os.path.join(os.getcwd(), 'uploads/novel_txt')
        if data['update_from'] == 1:
            from backtask.getnovelfromybdu import get_single_novel_text
            novel_name, novel_download_url = db_session.query(Novel.name, Novel.chapter_source_ybd_url).\
                filter(Novel.id == novel_id).first()
            get_single_novel_text(novel_download_url.encode('utf-8'), novel_name, str(novel_id), novel_text_path)
            from backtask.getnovel import update_single_chapter_last_and_next, single_from_text_store_sql
            single_from_text_store_sql(novel_text_path, novel_id)
            update_single_chapter_last_and_next(novel_id, 1)
            return jsonify({"good": "1"})
        elif data['update_from'] == 2:
            from backtask.getnovel import update_single_chapter_infor, update_chapter_content
            url = db_session.query(Novel.chapter_source_bequge_url).filter(Novel.id == novel_id).first()[0]
            update_single_chapter_infor(novel_id, url)
            update_chapter_content(novel_id)
            return jsonify({'good': '1'})
开发者ID:yudahai,项目名称:xiaoshuo01,代码行数:32,代码来源:view.py


示例2: list_domain_ids_for_user

    def list_domain_ids_for_user(self, user_id, group_ids, hints,
                                 inherited=False):
        with sql.transaction() as session:
            query = session.query(RoleAssignment.target_id)
            filters = []

            if user_id:
                sql_constraints = sqlalchemy.and_(
                    RoleAssignment.actor_id == user_id,
                    RoleAssignment.inherited == inherited,
                    RoleAssignment.type == AssignmentType.USER_DOMAIN)
                filters.append(sql_constraints)

            if group_ids:
                sql_constraints = sqlalchemy.and_(
                    RoleAssignment.actor_id.in_(group_ids),
                    RoleAssignment.inherited == inherited,
                    RoleAssignment.type == AssignmentType.GROUP_DOMAIN)
                filters.append(sql_constraints)

            if not filters:
                return []

            query = query.filter(sqlalchemy.or_(*filters)).distinct()

            return [assignment.target_id for assignment in query.all()]
开发者ID:eros-lige,项目名称:UPCloud-keystone,代码行数:26,代码来源:sql.py


示例3: deattach_user

 def deattach_user(self, user_id):
   '''
     - The user is deattached from the budget.
   '''
   if BudgetUserTable.query.filter(and_(BudgetUserTable.user_id==user_id, BudgetUserTable.budget_id==self.id, BudgetUserTable.role==BudgetUserTable.roles['owner'])).count() == 1:
     raise LogicException('It tries to delete a single owner with id=%d of the budget id=%d. It is not allowed' % (user_id, self.id))
   BudgetUserTable.query.filter(and_(BudgetUserTable.budget_id==self.id, BudgetUserTable.user_id==user_id)).delete()
开发者ID:vlikin,项目名称:budget,代码行数:7,代码来源:budget.py


示例4: update

    def update(self, queue, claim_id, metadata, project=None):
        if project is None:
            project = ''

        cid = utils.cid_decode(claim_id)
        if cid is None:
            raise errors.ClaimDoesNotExist(claim_id, queue, project)

        age = utils.get_age(tables.Claims.c.created)
        with self.driver.trans() as trans:
            qid = utils.get_qid(self.driver, queue, project)

            update = tables.Claims.update().where(sa.and_(
                tables.Claims.c.ttl > age,
                tables.Claims.c.id == cid,
                tables.Claims.c.id == qid))

            update = update.values(ttl=metadata['ttl'])

            res = trans.execute(update)
            if res.rowcount != 1:
                raise errors.ClaimDoesNotExist(claim_id, queue, project)

            update = (tables.Messages.update().
                      values(ttl=metadata['ttl'] + metadata['grace']).
                      where(sa.and_(
                          tables.Messages.c.ttl < metadata['ttl'],
                          tables.Messages.c.cid == cid)))
            trans.execute(update)
开发者ID:openstacker,项目名称:zaqar,代码行数:29,代码来源:claims.py


示例5: _get_relate_filter

  def _get_relate_filter(cls, predicate, related_type):
    """Used for filtering by related_assignee.

    Returns:
        Boolean stating whether such an assignee exists.
    """
    # pylint: disable=invalid-name
    # The upper case variables are allowed here to shorthand the class names.
    Rel = relationship.Relationship
    RelAttr = relationship.RelationshipAttr
    Person = person.Person
    return db.session.query(Rel).join(RelAttr).join(
        Person,
        or_(and_(
            Rel.source_id == Person.id,
            Rel.source_type == Person.__name__
        ), and_(
            Rel.destination_id == Person.id,
            Rel.destination_type == Person.__name__
        ))
    ).filter(and_(
        RelAttr.attr_value.contains(related_type),
        RelAttr.attr_name == "AssigneeType",
        or_(and_(
            Rel.source_type == Person.__name__,
            Rel.destination_type == cls.__name__,
            Rel.destination_id == cls.id
        ), and_(
            Rel.destination_type == Person.__name__,
            Rel.source_type == cls.__name__,
            Rel.source_id == cls.id
        )),
        or_(predicate(Person.name), predicate(Person.email))
    )).exists()
开发者ID:VinnieJohns,项目名称:ggrc-core,代码行数:34,代码来源:assignable.py


示例6: find_edges

def find_edges(db_session, is_directed=None, names=None, edges=None, graph_id=None, limit=None, offset=None,
               order_by=desc(Node.updated_at)):
	query = db_session.query(Edge)

	if graph_id is not None:
		query = query.filter(Edge.graph_id == graph_id)

	if is_directed is not None:
		query = query.filter(Edge.is_directed == is_directed)

	names = [] if names is None else names
	edges = [] if edges is None else edges
	if len(names + edges) > 0:
		names_filter = [Edge.name.ilike(name) for name in names]
		edges_filter = [and_(Edge.head_node_name.ilike(u), Edge.tail_node_name.ilike(v)) for u, v in edges]
		edges_filter.extend([and_(Edge.tail_node_name.ilike(u), Edge.head_node_name.ilike(v)) for u, v in edges])
		edges_filter.extend([and_(Edge.head_node_label.ilike(u), Edge.tail_node_label.ilike(v)) for u, v in edges])
		edges_filter.extend([and_(Edge.tail_node_label.ilike(u), Edge.head_node_label.ilike(v)) for u, v in edges])
		query = query.filter(or_(*(edges_filter + names_filter)))

	total = query.count()

	if order_by is not None:
		query = query.order_by(order_by)

	if offset is not None and limit is not None:
		query = query.limit(limit).offset(offset)

	return total, query.all()
开发者ID:jlaw9,项目名称:GraphSpace,代码行数:29,代码来源:dal.py


示例7: findbyhost

def findbyhost(session, host_id, created_start=None,
              created_end=None, created_user_id=None, desc=False):

    query = session.query(Machine).add_entity(Machine2Jobgroup).join(Machine2Jobgroup)

    query = query.filter(
                or_(
                    and_(Machine.parent_id == host_id, Machine.attribute == MACHINE_ATTRIBUTE['GUEST']),
                    and_(Machine.id == host_id, Machine.attribute == MACHINE_ATTRIBUTE['HOST'])
                )
            )

    #if created_user_id:
    if not created_user_id is None:
        query = query.filter(Machine2Jobgroup.created_user_id.in_(created_user_id))

    if created_start and created_end:
        query = query.filter(Machine2Jobgroup.created.between(created_start, created_end))
        
    elif created_start and (created_end is None):
        query = query.filter(created_start <= Machine2Jobgroup.created)
        
    elif (not created_start) and created_end:
        query = query.filter(Machine2Jobgroup.created <= created_end)
        
    if desc is True:
        return query.order_by(Machine2Jobgroup.id.desc()).all()
    else:
        return query.order_by(Machine2Jobgroup.id.asc()).all()
开发者ID:goura,项目名称:karesansui,代码行数:29,代码来源:machine_machine2jobgroup.py


示例8: _network_find

def _network_find(context, limit, sorts, marker, page_reverse, fields,
                  defaults=None, provider_query=False, **filters):
    query = context.session.query(models.Network)
    model_filters = _model_query(context, models.Network, filters, query)

    if defaults:
        invert_defaults = False
        if INVERT_DEFAULTS in defaults:
            invert_defaults = True
            defaults.pop(0)
        if filters and invert_defaults:
            query = query.filter(and_(not_(models.Network.id.in_(defaults)),
                                      and_(*model_filters)))
        elif not provider_query and filters and not invert_defaults:
            query = query.filter(or_(models.Network.id.in_(defaults),
                                     and_(*model_filters)))

        elif not invert_defaults:
            query = query.filter(models.Network.id.in_(defaults))
    else:
        query = query.filter(*model_filters)

    if "join_subnets" in filters:
        query = query.options(orm.joinedload(models.Network.subnets))

    return paginate_query(query, models.Network, limit, sorts, marker)
开发者ID:quadewarren,项目名称:quark,代码行数:26,代码来源:api.py


示例9: _subnet_find

def _subnet_find(context, limit, sorts, marker, page_reverse, fields,
                 defaults=None, provider_query=False, **filters):
    query = context.session.query(models.Subnet)
    model_filters = _model_query(context, models.Subnet, filters, query)

    if defaults:
        invert_defaults = False
        if INVERT_DEFAULTS in defaults:
            invert_defaults = True
            defaults.pop(0)
        if filters and invert_defaults:
            query = query.filter(and_(not_(models.Subnet.id.in_(defaults)),
                                      and_(*model_filters)))
        elif not provider_query and filters and not invert_defaults:
            query = query.filter(or_(models.Subnet.id.in_(defaults),
                                     and_(*model_filters)))

        elif not invert_defaults:
            query = query.filter(models.Subnet.id.in_(defaults))
    else:
        query = query.filter(*model_filters)

    if "join_dns" in filters:
        query = query.options(orm.joinedload(models.Subnet.dns_nameservers))

    if "join_routes" in filters:
        query = query.options(orm.joinedload(models.Subnet.routes))

    if "join_pool" in filters:
        query = query.options(orm.undefer('_allocation_pool_cache'))

    return paginate_query(query, models.Subnet, limit, sorts, marker)
开发者ID:quadewarren,项目名称:quark,代码行数:32,代码来源:api.py


示例10: GetJson_ACPeriodCate

def GetJson_ACPeriodCate(userID, modeDate, startDate, endDate):
    """返回Json:门禁趋势与分布
    :param userID: 查询工号
    :param modeDate: 日期模式,合并到最短时间单位. 0-day, 1-week, 2-month, 3-Quarter. (default 2)
    :param startDate: 限定来源数据起始日期
    :param endDate: 限定来源数据结束日期
    """
    # Query.
    strQuery = db.session.query(acrec.ac_datetime, ac_loc.category).filter(
        and_(acrec.user_id == userID, acrec.node_id == ac_loc.node_id)).order_by(acrec.ac_datetime)
    if len(startDate) != 0:
        strQuery = strQuery.filter(and_(acrec.ac_datetime >= startDate, acrec.ac_datetime <= endDate))
    results = strQuery.all()
    if len(results) == 0:
        return {'errMsg': u'没有找到记录。'}

    res_datetimes = [result.ac_datetime for result in results]
    res_categorys = [result.category for result in results]

    from ACPeriodCate import ACPeriodCate
    process = ACPeriodCate(res_datetimes, res_categorys)
    json_dateTrend = process.get_date_trend(modeDate)
    json_timeDistribution = process.get_time_distribution()

    json_response = {'json_dateTrend':json_dateTrend, 'json_timeDistribution':json_timeDistribution}

    return json_response
开发者ID:Fleeting198,项目名称:WitCampus,代码行数:27,代码来源:GetJson_ACPeriodCate.py


示例11: _get_filter

    def _get_filter(self, tag, user_id, include_draft, conn):
        filters = []
        if tag:
            tag = tag.upper()
            tag_statement = sqla.select([self._tag_table.c.id]).where(
                self._tag_table.c.text == tag)
            tag_result = conn.execute(tag_statement).fetchone()
            if tag_result is not None:
                tag_id = tag_result[0]
                tag_filter = sqla.and_(
                    self._tag_posts_table.c.tag_id == tag_id,
                    self._post_table.c.id == self._tag_posts_table.c.post_id
                )
                filters.append(tag_filter)

        if user_id:
            user_filter = sqla.and_(
                self._user_posts_table.c.user_id == user_id,
                self._post_table.c.id == self._user_posts_table.c.post_id
            )
            filters.append(user_filter)

        draft_filter = self._post_table.c.draft == 1 if include_draft else \
            self._post_table.c.draft == 0
        filters.append(draft_filter)
        sql_filter = sqla.and_(*filters)
        return sql_filter
开发者ID:HsiaoFin,项目名称:Flask-Blogging,代码行数:27,代码来源:sqlastorage.py


示例12: load_root

    def load_root(cls, sess, name='root', use_cache=True):
        """
        Loads root resource of resource tree.

        Since we allow several trees in the same table, argument ``name`` tells
        which one we want.

        :param sess: A DB session
        :param name: Name of the wanted root node
        :return: Instance of the root node or, None if not found
        """
        # CAVEAT: Setup fails if we use cache here!
        if use_cache:
            r = sess.query(
                cls
            ).options(
                pym.cache.FromCache("auth_long_term",
                cache_key='resource:{}:None'.format(name))
            ).options(
                pym.cache.RelationshipCache(cls.children, "auth_long_term",
                cache_key='resource:{}:None:children'.format(name))
            ).options(
                # CAVEAT: Program hangs if we use our own cache key here!
                pym.cache.RelationshipCache(cls.acl, "auth_long_term")  # ,
                #cache_key='resource:{}:None:acl'.format(name))
            ).filter(
                sa.and_(cls.parent_id == None, cls.name == name)
            ).one()
        else:
            r = sess.query(
                cls
            ).filter(
                sa.and_(cls.parent_id == None, cls.name == name)
            ).one()
        return r
开发者ID:mmarescc,项目名称:Parenchym,代码行数:35,代码来源:models.py


示例13: perform_create

def perform_create():
    details = json.loads(request.form['details'])
    item_result = list()
    isOk = True
    store = list()
    for detail in details:
        item = Item.query.filter_by(number=detail['number']).first()
        if item:
            place_id = session['place_id']
            for c in detail['columns']:
                have = Storage.query.filter(and_(Storage.item_id==item.id,Storage.place_id==place_id, Storage.size==c['size'])).first().amount
                rest = have - int(c['amount'])
                if rest<0:
                    isOk=False
                    flash('编号:%s 尺寸:%s的货物库存不足,剩余:%s,需要:%s' %\
                          (detail['number'], c['size'], have, c['amount']))
                store.append(dict(item_id=item.id, amount=rest, size=c['size']))
        else:
            isOk = False
            flash('不存在编号%s的货物' % detail['number'], category='error')
        #return json.dumps(store)
    if isOk:
        for s in store:
            change = Storage.query.filter(and_(Storage.item_id==s['item_id'],\
                                               Storage.size==s['size'], \
                                               Storage.place_id==session['place_id'])).first()
            change.amount=s['amount']
            db.session.add(change)
        db.session.commit()
        flash('出货成功', 'normal')
    return redirect(url_for('items.list_all'))
开发者ID:SR1s,项目名称:WMS,代码行数:31,代码来源:items.py


示例14: get_cases

 def get_cases(status, current_user, user=False, QA=False, current_user_perms=False, case_perm_checker=None,
               case_man=False):
     q = session.query(Case)
     if status != 'All' and status != "Queued":
         q = q.filter_by(currentStatus=status)
     elif status == "Queued":
         q = q.filter_by(currentStatus=CaseStatus.OPEN).join('tasks').filter(Task.currentStatus == TaskStatus.QUEUED)
     if user is True:
         q = q.join('tasks').join(Task.task_roles)
         if QA:
             q = q.filter(and_(UserTaskRoles.user_id == current_user.id, UserTaskRoles.role.in_(UserTaskRoles.qa_roles)))
         else:
             q = q.filter(and_(UserTaskRoles.user_id == current_user.id, UserTaskRoles.role.in_(UserTaskRoles.inv_roles)))
         return q.order_by(desc(Case.creation_date)).all()
     else:
         cases = q.order_by(desc(Case.creation_date)).all()
         output = []
         for case in cases:
             if (case_man is True and case.principle_case_manager is None and case.secondary_case_manager is None) \
                     or case_man is False:
                 try:
                     case_perm_checker(current_user, case, "view")
                     output.append(case)
                 except Forbidden:
                     pass
         return output
开发者ID:pappacurds,项目名称:LCDI_SEATING_CHART,代码行数:26,代码来源:caseModel.py


示例15: list_tokens

def list_tokens(typ=None):
    """Get a list of all unlimited-duration tokens the user has permisison to
    see.

    With ``?typ=..``, limit to tokens of that type.

    Note that the response does not include the actual token strings.
    Such strings are only revealed when creating a new token."""
    tbl = tables.Token
    email = get_user_email()

    conds = []
    if p.base.tokens.prm.view.can():
        conds.append(tbl.typ == 'prm')
    if p.base.tokens.usr.view.all.can():
        conds.append(tbl.typ == 'usr')
    elif email and p.base.tokens.usr.view.my.can():
        conds.append(sa.and_(tbl.typ == 'usr',
                             tbl.user == email))
    if not conds:
        return []
    disjunction = sa.or_(*conds)
    if typ:
        filter_cond = sa.and_(disjunction, tbl.typ == typ)
    else:
        filter_cond = disjunction

    q = tables.Token.query.filter(filter_cond)
    return [t.to_jsontoken() for t in q.all()]
开发者ID:Callek,项目名称:build-relengapi,代码行数:29,代码来源:__init__.py


示例16: validate

    def validate(cls, username, password):
        # Lookup the user
        user = cls.get_by(username=username)
        if user:
            salt = user.password.split(':')[0]
            pbk = cls.__hash_password__(password, salt)

            # If PBKDF2 matches...
            match = cls.query.filter(and_(
                cls.username == username,
                cls.password == pbk
            )).first()
            if match is not None:
                return match

            # Otherwise the user might have a sha256 password
            salt = getattr(
                getattr(conf, 'session', None),
                'password_salt',
                'example'
            )
            sha = sha256(password + salt).hexdigest()

            # If sha256 matches...
            match = cls.query.filter(and_(
                cls.username == username,
                cls.password == sha
            )).first()
            if match is not None:
                # Overwrite to use PBKDF2 in the future
                user.password = password
                return match
开发者ID:ryanpetrello,项目名称:draughtcraft,代码行数:32,代码来源:users.py


示例17: get_graphs_by_edges_and_nodes_and_names

def get_graphs_by_edges_and_nodes_and_names(db_session, group_ids=None, names=None, nodes=None, edges=None, tags=None,
                                            order=desc(Graph.updated_at), page=0, page_size=10, partial_matching=False,
                                            owner_email=None, is_public=None):
	query = db_session.query(Graph)

	edges = [] if edges is None else edges
	nodes = [] if nodes is None else nodes
	names = [] if names is None else names
	tags = [] if tags is None else tags

	edges = [('%%%s%%' % u, '%%%s%%' % v) for u, v in edges] if partial_matching else edges
	nodes = ['%%%s%%' % node for node in nodes] if partial_matching else nodes
	names = ['%%%s%%' % name for name in names] if partial_matching else names
	tags = ['%%%s%%' % tag for tag in tags]

	graph_filter_group = []
	if is_public is not None:
		graph_filter_group.append(Graph.is_public == is_public)
	if owner_email is not None:
		graph_filter_group.append(Graph.owner_email == owner_email)
	if group_ids is not None:
		query = query.filter(Graph.shared_with_groups.any(Group.id.in_(group_ids)))
	if len(graph_filter_group) > 0:
		query = query.filter(*graph_filter_group)

	names_filter_group = [Graph.name.ilike(name) for name in names]
	tags_filter_group = [GraphTag.name.ilike(tag) for tag in tags]
	nodes_filter_group = [Node.label.ilike(node) for node in nodes]
	nodes_filter_group.extend([Node.name.ilike(node) for node in nodes])
	edges_filter_group = [and_(Edge.head_node.has(Node.name.ilike(u)), Edge.tail_node.has(Node.name.ilike(v))) for u, v
	                      in edges]
	edges_filter_group.extend(
		[and_(Edge.tail_node.has(Node.name.ilike(u)), Edge.head_node.has(Node.name.ilike(v))) for u, v in edges])
	edges_filter_group.extend(
		[and_(Edge.head_node.has(Node.label.ilike(u)), Edge.tail_node.has(Node.label.ilike(v))) for u, v in edges])
	edges_filter_group.extend(
		[and_(Edge.tail_node.has(Node.label.ilike(u)), Edge.head_node.has(Node.label.ilike(v))) for u, v in edges])

	options_group = []
	if len(nodes_filter_group) > 0:
		options_group.append(joinedload('nodes'))
	if len(edges_filter_group) > 0:
		options_group.append(joinedload('edges'))
	if len(options_group) > 0:
		query = query.options(*options_group)

	combined_filter_group = []
	if len(nodes_filter_group) > 0:
		combined_filter_group.append(Graph.nodes.any(or_(*nodes_filter_group)))
	if len(edges_filter_group) > 0:
		combined_filter_group.append(Graph.edges.any(or_(*edges_filter_group)))
	if len(names_filter_group) > 0:
		combined_filter_group.append(*names_filter_group)
	if len(tags_filter_group) > 0:
		combined_filter_group.append(*tags_filter_group)

	if len(combined_filter_group) > 0:
		query = query.filter(or_(*combined_filter_group))

	return query.order_by(order).limit(page_size).offset(page * page_size).all()
开发者ID:jlaw9,项目名称:GraphSpace,代码行数:60,代码来源:dal.py


示例18: _get_criterion

def _get_criterion(resource_id, member_id=None, is_owner=True):
    """Generates criterion for querying resource_member_v2 table."""

    # Resource owner query resource membership with member_id.
    if is_owner and member_id:
        return sa.and_(
            models.ResourceMember.project_id == security.get_project_id(),
            models.ResourceMember.resource_id == resource_id,
            models.ResourceMember.member_id == member_id
        )
    # Resource owner query resource memberships.
    elif is_owner and not member_id:
        return sa.and_(
            models.ResourceMember.project_id == security.get_project_id(),
            models.ResourceMember.resource_id == resource_id,
        )

    # Other members query other resource membership.
    elif not is_owner and member_id and member_id != security.get_project_id():
        return None

    # Resource member query resource memberships.
    return sa.and_(
        models.ResourceMember.member_id == security.get_project_id(),
        models.ResourceMember.resource_id == resource_id
    )
开发者ID:anilyadav,项目名称:mistral,代码行数:26,代码来源:api.py


示例19: get_edges

def get_edges(db_session, edges, order=desc(Edge.updated_at), page=0, page_size=10, partial_matching=False):
	edges = [('%%%s%%' % u, '%%%s%%' % v) for u, v in edges] if partial_matching else edges
	filter_group = [and_(Edge.head_node_id.ilike(u), Edge.tail_node_id.ilike(v)) for u, v in edges]
	filter_group.append(
		[and_(Edge.head_node.has(Node.label.ilike(u)), Edge.tail_node.has(Node.label.ilike(v))) for u, v in edges])
	return db_session.query(Graph).options(joinedload('head_node'), joinedload('tail_node')).filter(
		or_(*filter_group)).order_by(order).limit(page_size).offset(page * page_size)
开发者ID:jlaw9,项目名称:GraphSpace,代码行数:7,代码来源:dal.py


示例20: generate_plots

def generate_plots(session, result_dir, output_dir):
    ratios = read_ratios(result_dir)

    iteration = session.query(func.max(cm2db.RowMember.iteration))
    clusters = [r[0] for r in session.query(cm2db.RowMember.cluster).distinct().filter(
        cm2db.RowMember.iteration == iteration)]

    figure = plt.figure(figsize=(6,3))
    for cluster in clusters:
        plt.clf()
        plt.cla()
        genes = [r.row_name.name for r in session.query(cm2db.RowMember).filter(
            and_(cm2db.RowMember.cluster == cluster, cm2db.RowMember.iteration == iteration))]
        cluster_conds = [c.column_name.name for c in session.query(cm2db.ColumnMember).filter(
            and_(cm2db.ColumnMember.cluster == cluster, cm2db.ColumnMember.iteration == iteration))]
        all_conds = [c[0] for c in session.query(cm2db.ColumnName.name).distinct()]
        non_cluster_conds = [cond for cond in all_conds if not cond in set(cluster_conds)]

        cluster_data = ratios.loc[genes, cluster_conds]
        non_cluster_data = ratios.loc[genes, non_cluster_conds]
        min_value = ratios.min()
        max_value = ratios.max()
        for gene in genes:
            values = [normalize_js(val) for val in cluster_data.loc[gene,:].values]
            values += [normalize_js(val) for val in non_cluster_data.loc[gene,:].values]
            plt.plot(values)

        # plot the "in"/"out" separator line
        cut_line = len(cluster_conds)
        plt.plot([cut_line, cut_line], [min_value, max_value], color='red',
                 linestyle='--', linewidth=1)
        plt.savefig(os.path.join(output_dir, "exp-%d" % cluster))
    plt.close(figure)
开发者ID:baliga-lab,项目名称:cmonkey2,代码行数:33,代码来源:plot_expressions.py



注:本文中的sqlalchemy.and_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.asc函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.alias函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap