• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python func.array_agg函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.func.array_agg函数的典型用法代码示例。如果您正苦于以下问题:Python array_agg函数的具体用法?Python array_agg怎么用?Python array_agg使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了array_agg函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: query_person_contacts

def query_person_contacts():
    return (
        DBSession.query(
            Person.id.label('person_id'),
            func.array_to_string(
                func.array_agg(
                    case([(Contact.contact_type == 'phone', Contact.contact)])
                ),
                ', '
            ).label('phone'),
            func.array_to_string(
                func.array_agg(
                    case([(Contact.contact_type == 'email', Contact.contact)])
                ),
                ', '
            ).label('email'),
            func.array_to_string(
                func.array_agg(
                    case([(Contact.contact_type == 'skype', Contact.contact)])
                ),
                ', '
            ).label('skype'),
        )
        .join(Contact, Person.contacts)
        .group_by(Person.id)
    )
开发者ID:alishir,项目名称:tcr,代码行数:26,代码来源:persons.py


示例2: query_experiment_sets

def query_experiment_sets():
    """This queries all the replicates of an experiment and groups them for further processing"""
    experiment_sets = {}

    ome = base.Session()

    RNASeqExperiment = datasets.RNASeqExperiment
    ArrayExperiment = datasets.ArrayExperiment
    ChIPExperiment = datasets.ChIPExperiment

    experiment_sets['RNAseq'] = ome.query(func.array_agg(RNASeqExperiment.name),RNASeqExperiment.group_name).\
                                          group_by(RNASeqExperiment.group_name, RNASeqExperiment.strain_id,
                                                   RNASeqExperiment.environment_id, RNASeqExperiment.machine_id,
                                                   RNASeqExperiment.sequencing_type).all()

    experiment_sets['array'] = ome.query(func.array_agg(ArrayExperiment.name)).\
                                         group_by(ArrayExperiment.strain_id, ArrayExperiment.environment_id,\
                                         ArrayExperiment.platform).all()

    experiment_sets['ChIP'] = ome.query(func.array_agg(ChIPExperiment.name)).\
                                        group_by(ChIPExperiment.strain_id, ChIPExperiment.environment_id,\
                                        ChIPExperiment.antibody, ChIPExperiment.protocol_type,\
                                        ChIPExperiment.target).all()
    ome.close()
    return experiment_sets
开发者ID:coltonlloyd,项目名称:ome,代码行数:25,代码来源:dataset_loading.py


示例3: _get_permissions_query

    def _get_permissions_query(self, session, identifier):
        """
        select domain, json_agg(parts) as permissions from
            (select domain, row_to_json(r) as parts from
                    (select domain, action, array_agg(distinct target) as target from
                        (select (case when domain is null then '*' else domain end) as domain,
                                (case when target is null then '*' else target end) as target,
                                array_agg(distinct (case when action is null then '*' else action end)) as action
                           from permission
                          group by domain, target
                         ) x
                      group by domain, action)
              r) parts
        group by domain;
        """
        thedomain = case([(Domain.name == None, "*")], else_=Domain.name)
        theaction = case([(Action.name == None, "*")], else_=Action.name)
        theresource = case([(Resource.name == None, "*")], else_=Resource.name)

        action_agg = func.array_agg(theaction.distinct())

        stmt1 = (
            session.query(
                Permission.domain_id,
                thedomain.label("domain"),
                Permission.resource_id,
                theresource.label("resource"),
                action_agg.label("action"),
            )
            .select_from(User)
            .join(role_membership_table, User.pk_id == role_membership_table.c.user_id)
            .join(role_permission_table, role_membership_table.c.role_id == role_permission_table.c.role_id)
            .join(Permission, role_permission_table.c.permission_id == Permission.pk_id)
            .outerjoin(Domain, Permission.domain_id == Domain.pk_id)
            .outerjoin(Action, Permission.action_id == Action.pk_id)
            .outerjoin(Resource, Permission.resource_id == Resource.pk_id)
            .filter(User.identifier == identifier)
            .group_by(Permission.domain_id, Domain.name, Permission.resource_id, Resource.name)
        ).subquery()

        stmt2 = (
            session.query(stmt1.c.domain, stmt1.c.action, func.array_agg(stmt1.c.resource.distinct()).label("resource"))
            .select_from(stmt1)
            .group_by(stmt1.c.domain, stmt1.c.action)
        ).subquery()

        stmt3 = (
            session.query(stmt2.c.domain, func.row_to_json(as_row(stmt2)).label("parts")).select_from(stmt2)
        ).subquery()

        final = (
            session.query(stmt3.c.domain, cast(func.json_agg(stmt3.c.parts), Text))
            .select_from(stmt3)
            .group_by(stmt3.c.domain)
        )

        return final
开发者ID:YosaiProject,项目名称:yosai_alchemystore,代码行数:57,代码来源:accountstore.py


示例4: percentage_by_district

    def percentage_by_district(self):
        """ Returns the percentage of votes aggregated by the distict. Includes
        uncounted districts and districts with no results available.

        """

        results = self.election.results
        results = results.join(ElectionResult.candidate_results)
        results = results.filter(CandidateResult.candidate_id == self.id)
        results = results.with_entities(
            ElectionResult.district.label('name'),
            func.array_agg(ElectionResult.entity_id).label('entities'),
            func.coalesce(
                func.bool_and(ElectionResult.counted), False
            ).label('counted'),
            func.sum(ElectionResult.accounted_ballots).label('total'),
            func.sum(CandidateResult.votes).label('votes'),
        )
        results = results.group_by(ElectionResult.district)
        results = results.order_by(None)
        results = results.all()
        percentage = {
            r.name: {
                'counted': r.counted,
                'entities': r.entities,
                'percentage': 100 * (r.votes / r.total) if r.total else 0.0
            } for r in results
        }

        empty = self.election.results
        empty = empty.with_entities(
            ElectionResult.district.label('name'),
            func.array_agg(ElectionResult.entity_id).label('entities'),
            func.coalesce(
                func.bool_and(ElectionResult.counted), False
            ).label('counted')
        )
        empty = empty.group_by(ElectionResult.district)
        empty = empty.order_by(None)
        for result in empty:
            update = (
                result.name not in percentage
                or (
                    set(percentage[result.name]['entities'])
                    != set(result.entities)
                )
            )
            if update:
                percentage[result.name] = {
                    'counted': result.counted,
                    'entities': result.entities,
                    'percentage': 0.0
                }

        return percentage
开发者ID:OneGov,项目名称:onegov.ballot,代码行数:55,代码来源:candidate.py


示例5: _get_nonldap_sources

def _get_nonldap_sources(session):
    sources = session.query(
        CtiDirectories.name,
        CtiDirectories.uri,
        Directories.dirtype,
        Directories.xivo_username,
        Directories.xivo_password,
        Directories.xivo_verify_certificate,
        Directories.xivo_custom_ca_path,
        CtiDirectories.delimiter,
        CtiDirectories.match_direct,
        CtiDirectories.match_reverse,
        func.array_agg(CtiDirectoryFields.fieldname).label('fields'),
        func.array_agg(CtiDirectoryFields.value).label('values'),
    ).join(
        Directories,
        Directories.uri == CtiDirectories.uri
    ).outerjoin(
        CtiDirectoryFields,
        CtiDirectoryFields.dir_id == CtiDirectories.id
    ).group_by(
        CtiDirectories.name,
        CtiDirectories.uri,
        Directories.dirtype,
        Directories.xivo_username,
        Directories.xivo_password,
        Directories.xivo_verify_certificate,
        Directories.xivo_custom_ca_path,
        CtiDirectories.delimiter,
        CtiDirectories.match_direct,
        CtiDirectories.match_reverse,
    )

    source_configs = []
    for source in sources.all():
        source_config = {
            'name': source.name,
            'type': source.dirtype,
            'uri': source.uri,
            'delimiter': source.delimiter,
            'searched_columns': json.loads(source.match_direct or '[]'),
            'first_matched_columns': json.loads(source.match_reverse or '[]'),
            'format_columns': _format_columns(source.fields, source.values),
        }
        if source.dirtype == 'xivo':
            source_config['xivo_username'] = source.xivo_username
            source_config['xivo_password'] = source.xivo_password
            source_config['xivo_verify_certificate'] = source.xivo_verify_certificate
            source_config['xivo_custom_ca_path'] = source.xivo_custom_ca_path
        source_configs.append(source_config)

    return source_configs
开发者ID:alafarcinade,项目名称:xivo-dao,代码行数:52,代码来源:directory_dao.py


示例6: start_requests

 def start_requests(self):
     summary_utc = datetime.utcnow() - timedelta(days=1)
     db_engine = create_engine(self.settings.get('SQLALCHEMY_DATABASE_URI'))
     db_session = sessionmaker(bind=db_engine)()
     db_query = db_session.query(LiveTVSite.id.label('site_id'), LiveTVRoom.id.label('room_id'),
                                 LiveTVRoom.url.label('room_url'),
                                 LiveTVRoomPresent.crawl_date_format.label('summary_date'),
                                 func.array_agg(LiveTVRoomPresent.online).label('online_list'))\
         .join(LiveTVSite, LiveTVRoom, LiveTVRoomPresent)\
         .filter(LiveTVRoomPresent.crawl_date_format == summary_utc.strftime(DAILY_DATE_FORMAT))\
         .group_by(LiveTVSite.id, LiveTVRoom.id, LiveTVRoom.url, LiveTVRoomPresent.crawl_date_format)
     for group_row in db_query:
         meta_info = {
             'site_id': group_row.site_id,
             'room_id': group_row.room_id,
             'summary_date': group_row.summary_date,
             'online': numpy.median(group_row.online_list)
         }
         room = self.session.query(LiveTVRoom).filter_by(id=meta_info['room_id']).one_or_none()
         if room:
             yield DailyItem(site_id=group_row.site_id, room_id=group_row.room_id,
                             summary_date=group_row.summary_date, online=numpy.median(group_row.online_list),
                             followers=room.followers, description=room.description, announcement=room.announcement,
                             fallback=False)
     db_session.close()
开发者ID:taogeT,项目名称:livetv_mining,代码行数:25,代码来源:quanmin.py


示例7: results_by_district

    def results_by_district(self):
        """ Returns the results aggregated by the distict.  """

        counted = func.coalesce(func.bool_and(BallotResult.counted), False)
        yeas = func.sum(BallotResult.yeas)
        nays = func.sum(BallotResult.nays)
        yeas_percentage = 100 * yeas / (
            cast(func.coalesce(func.nullif(yeas + nays, 0), 1), Float)
        )
        nays_percentage = 100 - yeas_percentage
        accepted = case({True: yeas > nays}, counted)
        results = self.results.with_entities(
            BallotResult.district.label('name'),
            counted.label('counted'),
            accepted.label('accepted'),
            yeas.label('yeas'),
            nays.label('nays'),
            yeas_percentage.label('yeas_percentage'),
            nays_percentage.label('nays_percentage'),
            func.sum(BallotResult.empty).label('empty'),
            func.sum(BallotResult.invalid).label('invalid'),
            func.sum(BallotResult.eligible_voters).label('eligible_voters'),
            func.array_agg(BallotResult.entity_id).label('entity_ids')
        )
        results = results.group_by(BallotResult.district)
        results = results.order_by(None).order_by(BallotResult.district)
        return results
开发者ID:OneGov,项目名称:onegov.ballot,代码行数:27,代码来源:ballot.py


示例8: get_rates_by_dates_for_currency_in_period

    def get_rates_by_dates_for_currency_in_period(self, currency, start_date, end_date):
        """
        :type currency: str
        :type start_date: datetime.date
        :type end_date: datetime.date
        :rtype: dict[datetime.date, list[decimal.Decimal]]
        """
        result = self.db_session\
            .query(
                ExchangeRate.date,
                func.array_agg(aggregate_order_by(ExchangeRate.rate, ExchangeRate.provider_id.asc()))
            )\
            .filter(
                and_(
                    ExchangeRate.date >= start_date,
                    ExchangeRate.date <= end_date,
                    ExchangeRate.currency == currency,
                    ExchangeRate.rate.isnot(None)
                )
            )\
            .group_by(ExchangeRate.date)\
            .order_by(ExchangeRate.date)\
            .all()

        return {r[0]: list(r[1]) for r in result}
开发者ID:business-factory,项目名称:gold-digger,代码行数:25,代码来源:dao_exchange_rate.py


示例9: get_published_briefs

    def get_published_briefs(self):
        subquery = (
            db
            .session
            .query(
                BriefUser.brief_id,
                func.array_agg(func.substring(User.email_address, '@(.*)')).label('domain')
            )
            .join(User)
            .group_by(BriefUser.brief_id)
            .subquery()
        )
        result = (
            db
            .session
            .query(
                Brief.id,
                Brief.data['organisation'].astext.label('organisation'),
                Brief.published_at,
                Brief.withdrawn_at,
                Brief.data['title'].astext.label('title'),
                Brief.data['sellerSelector'].astext.label('openTo'),
                Brief.data['areaOfExpertise'].astext.label('brief_category'),
                Lot.name.label('brief_type'),
                subquery.columns.domain[1].label('publisher_domain')
            )
            .join(subquery, Brief.id == subquery.columns.brief_id)
            .join(Lot)
            .filter(Brief.published_at.isnot(None))
            .order_by(Brief.id)
            .all()
        )

        return [r._asdict() for r in result]
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:34,代码来源:briefs.py


示例10: build_query_to_populate

 def build_query_to_populate(self, query, full_table, aggregate_table):
     insert_columns = [aggregate_table.c.upload_samples]
     mean = full_table.c.upload_octets / full_table.c.upload_time
     is_safe = and_(full_table.c.upload_time > 0, full_table.c.download_flag == 'f')
     safe_mean = case([(is_safe, mean)], else_ = None)
     select_query = (query.column(func.array_agg(safe_mean)))
     return insert_columns, select_query
开发者ID:Supermighty,项目名称:speeduplouisville,代码行数:7,代码来源:aggregate.py


示例11: upgrade

def upgrade():
    if not is_sqlite:
        connection = op.get_bind()
        attendees = connection.execute(select([
            attendee_table.c.hotel_pin,
            func.count(attendee_table.c.id),
            func.array_agg(attendee_table.c.id),
        ]).where(and_(
            attendee_table.c.hotel_pin != None,
            attendee_table.c.hotel_pin != '',
        )).group_by(
            attendee_table.c.hotel_pin,
        ).having(
            func.count(attendee_table.c.id) > 1,
        ))
        for hotel_pin, count, ids in attendees:
            hotel_pin_template = '{{:0{}d}}{{}}'.format(len(str(count))) if count > 9 else '{}{}'

            for i, id in enumerate(ids):
                new_hotel_pin = hotel_pin_template.format(i, hotel_pin)
                connection.execute(
                    attendee_table.update().where(attendee_table.c.id == id).values({
                        'hotel_pin': new_hotel_pin,
                    })
                )

    op.create_unique_constraint(op.f('uq_attendee_hotel_pin'), 'attendee', ['hotel_pin'])
开发者ID:magfest,项目名称:ubersystem,代码行数:27,代码来源:116e5aad3a5c_adds_attendee_hotel_pin_unique_.py


示例12: set_available_langs

def set_available_langs(documents, loaded=False):
    """Load and set the available langs for the given documents.
    """
    if len(documents) == 0:
        return

    if loaded:
        # all locales are already loaded, so simply set the attribute
        for document in documents:
            document.available_langs = [
                locale.lang for locale in document.locales]
    else:
        document_ids = [doc.document_id for doc in documents]
        documents_for_id = {doc.document_id: doc for doc in documents}

        # aggregate the langs per document into an array
        lang_agg = func.array_agg(
            DocumentLocale.lang,
            type_=postgresql.ARRAY(String)).label('langs')

        langs_per_doc = DBSession.query(
            DocumentLocale.document_id, lang_agg). \
            filter(DocumentLocale.document_id.in_(document_ids)). \
            group_by(DocumentLocale.document_id). \
            all()

        for document_id, langs in langs_per_doc:
            document = documents_for_id.get(document_id)
            document.available_langs = langs
开发者ID:arnaud-morvan,项目名称:v6_api,代码行数:29,代码来源:document.py


示例13: blacklist_filter

 def blacklist_filter(self):
     db = object_session(self)
     if db.query(BlacklistedTag).filter(BlacklistedTag.user_id == self.id).first() is None:
         return None
     return ~Request.tag_ids.overlap(
         db.query(func.array_agg(BlacklistedTag.tag_id))
         .filter(BlacklistedTag.user_id == self.id)
     )
开发者ID:tehdragonfly,项目名称:cherubplay,代码行数:8,代码来源:__init__.py


示例14: _get_ldap_sources

def _get_ldap_sources(session):
    ldap_cti_directories = session.query(
        CtiDirectories.name,
        CtiDirectories.uri,
        CtiDirectories.match_direct,
        CtiDirectories.match_reverse,
        func.array_agg(CtiDirectoryFields.fieldname).label('fields'),
        func.array_agg(CtiDirectoryFields.value).label('values'),
    ).outerjoin(
        CtiDirectoryFields,
        CtiDirectoryFields.dir_id == CtiDirectories.id
    ).filter(
        CtiDirectories.uri.like('ldapfilter://%%')
    ).group_by(
        CtiDirectories.name,
        CtiDirectories.uri,
        CtiDirectories.match_direct,
        CtiDirectories.match_reverse,
    )

    source_configs = []
    for dir in ldap_cti_directories.all():
        _, _, name = dir.uri.partition('ldapfilter://')
        try:
            ldap_config = ldap_dao.build_ldapinfo_from_ldapfilter(name)
        except LookupError:
            logger.warning('Skipping LDAP source %s', dir.name)
            continue

        custom_filter = ldap_config.get('filter') or ''
        if custom_filter:
            custom_filter = '({})'.format(custom_filter)

        source_configs.append({'type': 'ldap',
                               'name': dir.name,
                               'searched_columns': json.loads(dir.match_direct or '[]'),
                               'first_matched_columns': json.loads(dir.match_reverse or '[]'),
                               'format_columns': _format_columns(dir.fields, dir.values),
                               'ldap_uri': ldap_config['uri'],
                               'ldap_base_dn': ldap_config['basedn'],
                               'ldap_username': ldap_config['username'],
                               'ldap_password': ldap_config['password'],
                               'ldap_custom_filter': custom_filter})

    return source_configs
开发者ID:alafarcinade,项目名称:xivo-dao,代码行数:45,代码来源:directory_dao.py


示例15: get_advanced_search_query

def get_advanced_search_query(employer_id, params, status):
    skills = params.get('skills')
    locations = params.get('locations')
    role = params.get('role')
    name = params.get('name')
    salary = params.get('salary')

    query = DBSession.query(Candidate.id).filter(Candidate.status == status)

    if employer_id:
        query = query.outerjoin(V_CANDIDATE_CURRENT_EMPLOYERS,
                                and_(V_CANDIDATE_CURRENT_EMPLOYERS.c.candidate_id == Candidate.id,
                                     V_CANDIDATE_CURRENT_EMPLOYERS.c.employer_id == employer_id)) \
            .filter(V_CANDIDATE_CURRENT_EMPLOYERS.c.candidate_id == None)

    if locations:
        query = query.join(PreferredLocation, Candidate.id == PreferredLocation.candidate_id)

        country_filter = set([c['country_iso'] for c in locations])
        city_filter = [and_(City.name == loc['city'], City.country_iso == loc['country_iso']) for loc in locations]
        city_ids = DBSession.query(City.id).filter(or_(*city_filter)).all()

        query = query.filter(or_(PreferredLocation.city_id.in_(city_ids),
                                 PreferredLocation.country_iso.in_(country_filter)))

    if salary or role:
        query = query.join(TargetPosition)
        if salary:
            query = query.filter(TargetPosition.minimum_salary <= salary)
        if role:
            role = get_by_name_or_raise(Role, role)
            query = query.filter(TargetPosition.role_id == role.id)

    if name and employer_id:
        name = name.lower()
        employer_ids = func.array_agg(Offer.employer_id, type_=ARRAY(TEXT)).label('employer_ids')
        offer_query = DBSession.query(Offer.candidate_id, employer_ids).filter(Offer.accepted != None) \
            .group_by(Offer.candidate_id).subquery()
        query = query.outerjoin(offer_query, offer_query.c.candidate_id == Candidate.id).filter(
            or_(cast(Candidate.id, TEXT).startswith(name),
                and_(
                    or_(func.lower(Candidate.first_name).startswith(name),
                        func.lower(Candidate.last_name).startswith(name)),
                    or_(
                        offer_query.c.employer_ids.any(str(employer_id)),
                        Candidate.anonymous == False
                    )
                )
            )
        )

    query = query.group_by(Candidate.id)

    if skills:
        query = query.join(CandidateSkill).join(Skill).filter(Skill.name.in_(skills)) \
            .having(func.count(Skill.name) == len(skills))
    return query
开发者ID:iwein,项目名称:temp,代码行数:57,代码来源:services.py


示例16: test_array_agg_array_literal_explicit_type

    def test_array_agg_array_literal_explicit_type(self):
        from sqlalchemy.dialects.postgresql import array

        expr = array([column("data", Integer), column("d2", Integer)])

        agg_expr = func.array_agg(expr, type_=ARRAY(Integer))
        is_(agg_expr.type._type_affinity, ARRAY)
        is_(agg_expr.type.item_type._type_affinity, Integer)

        self.assert_compile(
            agg_expr, "array_agg(ARRAY[data, d2])", dialect="postgresql"
        )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:12,代码来源:test_functions.py


示例17: test_aggregate_order_by_adapt

    def test_aggregate_order_by_adapt(self):
        m = MetaData()
        table = Table('table1', m, Column('a', Integer), Column('b', Integer))
        expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
        stmt = select([expr])

        a1 = table.alias('foo')
        stmt2 = sql_util.ClauseAdapter(a1).traverse(stmt)
        self.assert_compile(
            stmt2,
            "SELECT array_agg(foo.a ORDER BY foo.b DESC) AS array_agg_1 FROM table1 AS foo"
        )
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:12,代码来源:test_compiler.py


示例18: get_available_langs

def get_available_langs(document_id):
    """Return the available languages (e.g. ['en', 'fr']) for a single
    document.
    """
    return DBSession. \
        query(
            func.array_agg(
                DocumentLocale.lang,
                type_=postgresql.ARRAY(String))). \
        filter(DocumentLocale.document_id == document_id). \
        group_by(DocumentLocale.document_id). \
        scalar()
开发者ID:arnaud-morvan,项目名称:v6_api,代码行数:12,代码来源:document.py


示例19: query_person_passports

def query_person_passports():
    return (
        DBSession.query(
            Person.id.label('person_id'),
            func.array_to_string(
                func.array_agg(
                    case([(Passport.passport_type == 'citizen', Passport.num)])
                ),
                ', '
            ).label('citizen'),
            func.array_to_string(
                func.array_agg(
                    case([(Passport.passport_type == 'foreign', Passport.num)])
                ),
                ', '
            ).label('foreign'),
        )
        .join(Passport, Person.passports)
        .group_by(Person.id)
    )
    
开发者ID:alishir,项目名称:tcr,代码行数:20,代码来源:persons.py


示例20: get_dataset_sources

    def get_dataset_sources(self, dataset_id):
        # recursively build the list of (dataset_ref, source_dataset_ref) pairs starting from dataset_id
        # include (dataset_ref, NULL) [hence the left join]
        sources = select(
            [DATASET.c.id.label('dataset_ref'),
             DATASET_SOURCE.c.source_dataset_ref,
             DATASET_SOURCE.c.classifier]
        ).select_from(
            DATASET.join(DATASET_SOURCE,
                         DATASET.c.id == DATASET_SOURCE.c.dataset_ref,
                         isouter=True)
        ).where(
            DATASET.c.id == dataset_id
        ).cte(name="sources", recursive=True)

        sources = sources.union_all(
            select(
                [sources.c.source_dataset_ref.label('dataset_ref'),
                 DATASET_SOURCE.c.source_dataset_ref,
                 DATASET_SOURCE.c.classifier]
            ).select_from(
                sources.join(DATASET_SOURCE,
                             sources.c.source_dataset_ref == DATASET_SOURCE.c.dataset_ref,
                             isouter=True)
            ).where(sources.c.source_dataset_ref != None))

        # turn the list of pairs into adjacency list (dataset_ref, [source_dataset_ref, ...])
        # some source_dataset_ref's will be NULL
        aggd = select(
            [sources.c.dataset_ref,
             func.array_agg(sources.c.source_dataset_ref).label('sources'),
             func.array_agg(sources.c.classifier).label('classes')]
        ).group_by(sources.c.dataset_ref).alias('aggd')

        # join the adjacency list with datasets table
        query = select(
            _DATASET_SELECT_FIELDS + (aggd.c.sources, aggd.c.classes)
        ).select_from(aggd.join(DATASET, DATASET.c.id == aggd.c.dataset_ref))

        return self._connection.execute(query).fetchall()
开发者ID:ceos-seo,项目名称:Data_Cube_v2,代码行数:40,代码来源:_api.py



注:本文中的sqlalchemy.func.array_agg函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python func.avg函数代码示例发布时间:2022-05-27
下一篇:
Python sqlsoup.SqlSoup类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap