• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.alias函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.alias函数的典型用法代码示例。如果您正苦于以下问题:Python alias函数的具体用法?Python alias怎么用?Python alias使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了alias函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_property_mod_flags_query

def get_property_mod_flags_query(
    table,
    tracked_columns,
    mod_suffix='_mod',
    end_tx_column_name='end_transaction_id',
    tx_column_name='transaction_id',
):
    v1 = sa.alias(table, name='v')
    v2 = sa.alias(table, name='v2')
    primary_keys = [c.name for c in table.c if c.primary_key]

    return sa.select(
        columns=[
            getattr(v1.c, column)
            for column in primary_keys
        ] + [
            (sa.or_(
                getattr(v1.c, column) != getattr(v2.c, column),
                getattr(v2.c, tx_column_name).is_(None)
            )).label(column + mod_suffix)
            for column in tracked_columns
        ],
        from_obj=v1.outerjoin(
            v2,
            sa.and_(
                getattr(v2.c, end_tx_column_name) ==
                getattr(v1.c, tx_column_name),
                *[
                    getattr(v2.c, pk) == getattr(v1.c, pk)
                    for pk in primary_keys
                    if pk != tx_column_name
                ]
            )
        )
    ).order_by(getattr(v1.c, tx_column_name))
开发者ID:adamchainz,项目名称:sqlalchemy-continuum,代码行数:35,代码来源:schema.py


示例2: _peaks_query

    def _peaks_query(self, session):
        anode = alias(ChromatogramTreeNode.__table__)
        bnode = alias(ChromatogramTreeNode.__table__)
        apeak = alias(DeconvolutedPeak.__table__)

        peak_join = apeak.join(
            ChromatogramTreeNodeToDeconvolutedPeak,
            ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)

        root_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)

        branch_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramTreeNodeBranch,
            ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
            bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)

        branch_ids = select([apeak.c.id]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(branch_peaks_join)

        root_ids = select([apeak.c.id]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(root_peaks_join)

        all_ids = root_ids.union_all(branch_ids)
        peaks = session.execute(all_ids).fetchall()
        return {p[0] for p in peaks}
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:35,代码来源:chromatogram.py


示例3: test_double

    def test_double(self):
        """tests lazy loading with two relationships simulatneously, from the same table, using aliases.  """

        users, orders, User, Address, Order, addresses = (self.tables.users,
                                self.tables.orders,
                                self.classes.User,
                                self.classes.Address,
                                self.classes.Order,
                                self.tables.addresses)


        openorders = sa.alias(orders, 'openorders')
        closedorders = sa.alias(orders, 'closedorders')

        mapper(Address, addresses)

        mapper(Order, orders)

        open_mapper = mapper(Order, openorders, non_primary=True)
        closed_mapper = mapper(Order, closedorders, non_primary=True)
        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy = True),
            open_orders = relationship(open_mapper, primaryjoin = sa.and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy='select'),
            closed_orders = relationship(closed_mapper, primaryjoin = sa.and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy='select')
        ))
        q = create_session().query(User)

        assert [
            User(
                id=7,
                addresses=[Address(id=1)],
                open_orders = [Order(id=3)],
                closed_orders = [Order(id=1), Order(id=5)]
            ),
            User(
                id=8,
                addresses=[Address(id=2), Address(id=3), Address(id=4)],
                open_orders = [],
                closed_orders = []
            ),
            User(
                id=9,
                addresses=[Address(id=5)],
                open_orders = [Order(id=4)],
                closed_orders = [Order(id=2)]
            ),
            User(id=10)

        ] == q.all()

        sess = create_session()
        user = sess.query(User).get(7)
        assert [Order(id=1), Order(id=5)] == create_session().query(closed_mapper).with_parent(user, property='closed_orders').all()
        assert [Order(id=3)] == create_session().query(open_mapper).with_parent(user, property='open_orders').all()
开发者ID:MVReddy,项目名称:sqlalchemy,代码行数:54,代码来源:test_lazy_relations.py


示例4: _as_array_query

    def _as_array_query(self, session):
        anode = alias(ChromatogramTreeNode.__table__)
        bnode = alias(ChromatogramTreeNode.__table__)
        apeak = alias(DeconvolutedPeak.__table__)

        peak_join = apeak.join(
            ChromatogramTreeNodeToDeconvolutedPeak,
            ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)

        root_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)

        branch_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramTreeNodeBranch,
            ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
            bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)

        branch_intensities = select([apeak.c.intensity, anode.c.retention_time]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(branch_peaks_join)

        root_intensities = select([apeak.c.intensity, anode.c.retention_time]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(root_peaks_join)

        all_intensities_q = root_intensities.union_all(branch_intensities).order_by(
            anode.c.retention_time)

        all_intensities = session.execute(all_intensities_q).fetchall()

        time = []
        signal = []
        current_signal = all_intensities[0][0]
        current_time = all_intensities[0][1]

        for intensity, rt in all_intensities[1:]:
            if abs(current_time - rt) < 1e-4:
                current_signal += intensity
            else:
                time.append(current_time)
                signal.append(current_signal)
                current_time = rt
                current_signal = intensity
        time.append(current_time)
        signal.append(current_signal)
        return np.array(time), np.array(signal)
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:53,代码来源:chromatogram.py


示例5: test_double

    def test_double(self):
        """Eager loading with two relationships simultaneously, 
            from the same table, using aliases."""

        openorders = sa.alias(orders, 'openorders')
        closedorders = sa.alias(orders, 'closedorders')

        mapper(Address, addresses)
        mapper(Order, orders)

        open_mapper = mapper(Order, openorders, non_primary=True)
        closed_mapper = mapper(Order, closedorders, non_primary=True)

        mapper(User, users, properties = dict(
            addresses = relationship(Address, lazy='subquery',
                                        order_by=addresses.c.id),
            open_orders = relationship(
                open_mapper,
                primaryjoin=sa.and_(openorders.c.isopen == 1,
                                 users.c.id==openorders.c.user_id),
                lazy='subquery', order_by=openorders.c.id),
            closed_orders = relationship(
                closed_mapper,
                primaryjoin=sa.and_(closedorders.c.isopen == 0,
                                 users.c.id==closedorders.c.user_id),
                lazy='subquery', order_by=closedorders.c.id)))

        q = create_session().query(User).order_by(User.id)

        def go():
            eq_([
                User(
                    id=7,
                    addresses=[Address(id=1)],
                    open_orders = [Order(id=3)],
                    closed_orders = [Order(id=1), Order(id=5)]
                ),
                User(
                    id=8,
                    addresses=[Address(id=2), Address(id=3), Address(id=4)],
                    open_orders = [],
                    closed_orders = []
                ),
                User(
                    id=9,
                    addresses=[Address(id=5)],
                    open_orders = [Order(id=4)],
                    closed_orders = [Order(id=2)]
                ),
                User(id=10)

            ], q.all())
        self.assert_sql_count(testing.db, go, 4)
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:53,代码来源:test_subquery_relations.py


示例6: execute

    def execute(self, metadata, connection, filter_values):
        asha_table = self.get_asha_table(metadata)

        max_date_query = sqlalchemy.select([
            sqlalchemy.func.max(asha_table.c.date).label('date'),
            asha_table.c.case_id.label('case_id')
        ])

        if self.filters:
            for filter in self.filters:
                max_date_query.append_whereclause(filter.build_expression())

        max_date_query.append_group_by(
            asha_table.c.case_id
        )

        max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')

        checklist_query = sqlalchemy.select()
        for column in self.columns:
            checklist_query.append_column(column.build_column(asha_table))

        checklist_query = checklist_query.where(
            asha_table.c.case_id == max_date_subquery.c.case_id
        ).where(
            asha_table.c.date == max_date_subquery.c.date
        )

        return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:johan--,项目名称:commcare-hq,代码行数:29,代码来源:sql_data.py


示例7: apply_default_value

    def apply_default_value(self, column):
        if column.default:
            execute = self.table.migration.conn.execute
            val = column.default.arg
            table = self.table.migration.metadata.tables[self.table.name]
            table.append_column(column)
            cname = getattr(table.c, column.name)
            if column.default.is_callable:
                table2 = alias(select([table]).limit(1).where(cname.is_(None)))
                Table = self.table.migration.metadata.tables['system_model']
                Column = self.table.migration.metadata.tables['system_column']
                j1 = join(Table, Column, Table.c.name == Column.c.model)
                query = select([func.count()]).select_from(table)
                nb_row = self.table.migration.conn.execute(query).fetchone()[0]
                query = select([Column.c.name]).select_from(j1)
                query = query.where(Column.c.primary_key.is_(True))
                query = query.where(Table.c.table == self.table.name)
                columns = [x[0] for x in execute(query).fetchall()]
                where = and_(*[getattr(table.c, x) == getattr(table2.c, x)
                               for x in columns])
                for offset in range(nb_row):
                    # call for each row because the default value
                    # could be a sequence or depend of other field
                    query = update(table).where(where).values(
                        {cname: val(None)})
                    execute(query)

            else:
                query = update(table).where(cname.is_(None)).values(
                    {cname: val})
                execute(query)
开发者ID:jssuzanne,项目名称:AnyBlok,代码行数:31,代码来源:migration.py


示例8: execute

    def execute(self, metadata, connection, filter_values):
        try:
            table = metadata.tables[self.table_name]
        except KeyError:
            raise TableNotFoundException("Unable to query table, table not found: %s" % self.table_name)

        asha_table = self.get_asha_table(metadata)

        max_date_query = sqlalchemy.select([
            sqlalchemy.func.max(asha_table.c.date).label('date'),
            asha_table.c.case_id.label('case_id')
        ])

        if self.filters:
            for filter in self.filters:
                max_date_query.append_whereclause(filter.build_expression(table))

        max_date_query.append_group_by(
            asha_table.c.case_id
        )

        max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')

        checklist_query = sqlalchemy.select()
        for column in self.columns:
            checklist_query.append_column(column.build_column(asha_table))

        checklist_query = checklist_query.where(
            asha_table.c.case_id == max_date_subquery.c.case_id
        ).where(
            asha_table.c.date == max_date_subquery.c.date
        )

        return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:tlwakwella,项目名称:commcare-hq,代码行数:34,代码来源:sql_data.py


示例9: filter_ancestors

  def filter_ancestors(self, and_self=False):
    "The same as :meth:`filter_descendants` but filters ancestor nodes."
    options = self._tree_options
    obj     = self._get_obj()

    #self._get_session_and_assert_flushed(obj)

    # Restrict ourselves to just those nodes within the same tree:
    tree_id = getattr(obj, self.tree_id_field.name)
    filter_ = self.tree_id_field == tree_id

    alias = sqlalchemy.alias(options.table)
    left_field = self.left_field
    filter_ &= sqlalchemy.between(
      getattr(alias.c, self.left_field.name),
      self.left_field, self.right_field)
    filter_ &= getattr(alias.c, self.pk_field.name) == \
               getattr(obj,     self.pk_field.name)

    if not and_self:
      filter_ &= self.pk_field != getattr(obj, self.pk_field.name)

    # WHERE tree_id = <node.tree_id> AND <node.path> LIKE path || '%'
    #filter_ = (self.tree_id_field == tree_id) \
    #          & sqlalchemy.sql.expression.literal(
    #                path, sqlalchemy.String
    #            ).like(options.path_field + '%')
    #if and_self:
    #  filter_ &= self.depth_field  <= depth
    #else:
    #  filter_ &= self.depth_field < depth
    return filter_
开发者ID:adurieux,项目名称:sqlalchemy-orm-tree,代码行数:32,代码来源:instance.py


示例10: find

    def find(self, _limit=None, _offset=0, _step=5000,
             order_by='id', **_filter):
        """
        Performs a simple search on the table. Simply pass keyword arguments as ``filter``.
        ::

            results = table.find(country='France')
            results = table.find(country='France', year=1980)

        Using ``_limit``::

            # just return the first 10 rows
            results = table.find(country='France', _limit=10)

        You can sort the results by single or multiple columns. Append a minus sign
        to the column name for descending order::

            # sort results by a column 'year'
            results = table.find(country='France', order_by='year')
            # return all rows sorted by multiple columns (by year in descending order)
            results = table.find(order_by=['country', '-year'])

        By default :py:meth:`find() <dataset.Table.find>` will break the
        query into chunks of ``_step`` rows to prevent huge tables
        from being loaded into memory at once.

        For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>`
        instead."""
        self._check_dropped()
        if not isinstance(order_by, (list, tuple)):
            order_by = [order_by]
        order_by = [o for o in order_by if o in self.table.columns]
        order_by = [self._args_to_order_by(o) for o in order_by]

        args = self._args_to_clause(_filter)

        # query total number of rows first
        count_query = alias(self.table.select(whereclause=args, limit=_limit, offset=_offset), name='count_query_alias').count()
        rp = self.database.executable.execute(count_query)
        total_row_count = rp.fetchone()[0]

        if _step is None or _step is False or _step == 0:
            _step = total_row_count

        if total_row_count > _step and not order_by:
            _step = total_row_count
            log.warn("query cannot be broken into smaller sections because it is unordered")

        queries = []

        for i in count():
            qoffset = _offset + (_step * i)
            qlimit = _step
            if _limit is not None:
                qlimit = min(_limit - (_step * i), _step)
            if qlimit <= 0:
                break
            queries.append(self.table.select(whereclause=args, limit=qlimit,
                                             offset=qoffset, order_by=order_by))
        return ResultIter((self.database.executable.execute(q) for q in queries))
开发者ID:adityaU,项目名称:dataset,代码行数:60,代码来源:table.py


示例11: execute

    def execute(self, connection, filter_values):
        max_date_query = sqlalchemy.select([
            sqlalchemy.func.max(sqlalchemy.column('completed_on')).label('completed_on'),
            sqlalchemy.column('case_id').label('case_id')
        ]).select_from(sqlalchemy.table(self.table_name))

        if self.filters:
            for filter in self.filters:
                max_date_query.append_whereclause(filter.build_expression())

        max_date_query.append_group_by(
            sqlalchemy.column('case_id')
        )

        max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')

        asha_table = self.get_asha_table_name()
        checklist_query = sqlalchemy.select()
        for column in self.columns:
            checklist_query.append_column(column.build_column())

        checklist_query = checklist_query.where(
            sqlalchemy.literal_column('"{}".case_id'.format(asha_table)) == max_date_subquery.c.case_id
        ).where(
            sqlalchemy.literal_column('"{}".completed_on'.format(asha_table)) == max_date_subquery.c.completed_on
        ).select_from(sqlalchemy.table(asha_table))

        return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:28,代码来源:sql_data.py


示例12: compute_distance

def compute_distance(session, table, monosaccharide_names, model=GlycopeptideMatch):
    distance_table = make_distance_table(model)
    distance_table.create(session.connection())
    from_entity = alias(table)
    to_entity = alias(table)

    distances = [getattr(from_entity.c, name) - getattr(to_entity.c, name) for name in monosaccharide_names]
    selected = [from_entity.c.id, to_entity.c.id] + distances
    q = session.query(*selected).join(to_entity, from_entity.c.id != to_entity.c.id).yield_per(1000)
    for fields in q:
        from_id, to_id = fields[:2]
        distance = sum(fields[2:])
        # print from_id, to_id, distance
        session.execute(distance_table.insert(), [{'from_id': from_id, "to_id": to_id, "distance": distance}])
    session.commit()
    return distance_table
开发者ID:BostonUniversityCBMS,项目名称:glycresoft_sqlalchemy,代码行数:16,代码来源:glycan_composition_distance.py


示例13: test_delete_stmt_with_comma_subquery_alias_join

def test_delete_stmt_with_comma_subquery_alias_join():
    parent_ = sa.alias(product)

    del_stmt = (
        sa.delete(items)
        .where(items.c.order_id == orders.c.id)
        .where(orders.c.customer_id.in_(sa.select([customers.c.id]).where(customers.c.email.endswith("test.com"))))
        .where(items.c.product_id == product.c.id)
        .where(product.c.parent_id == parent_.c.id)
        .where(parent_.c.id != hammy_spam.c.ham_id)
    )

    expected = """
        DELETE FROM items
        USING orders, products, products AS products_1, "ham, spam"
        WHERE items.order_id = orders.id
        AND orders.customer_id IN
        (SELECT customers.id
        FROM customers
        WHERE (customers.email LIKE '%%' || 'test.com'))
        AND items.product_id = products.id
        AND products.parent_id = products_1.id
        AND products_1.id != "ham, spam".ham_id"""

    assert clean(compile_query(del_stmt)) == clean(expected)
开发者ID:solackerman,项目名称:sqlalchemy-redshift,代码行数:25,代码来源:test_delete_stmt.py


示例14: test_delete_stmt_on_alias

def test_delete_stmt_on_alias():
    parent_ = sa.alias(product)
    del_stmt = sa.delete(product).where(product.c.parent_id == parent_.c.id)
    expected = """
        DELETE FROM products
        USING products AS products_1
        WHERE products.parent_id = products_1.id"""
    assert clean(compile_query(del_stmt)) == clean(expected)
开发者ID:solackerman,项目名称:sqlalchemy-redshift,代码行数:8,代码来源:test_delete_stmt.py


示例15: qmonosaccharide

 def qmonosaccharide(cls, monosaccharide_name):
     if monosaccharide_name in cls._qmonosaccharide_cache:
         return cls._qmonosaccharide_cache[monosaccharide_name]
     symbol = alias(cls.GlycanCompositionAssociation.__table__.select().where(
         cls.GlycanCompositionAssociation.__table__.c.base_type == monosaccharide_name),
         monosaccharide_name)
     cls._qmonosaccharide_cache[monosaccharide_name] = symbol
     return symbol
开发者ID:BostonUniversityCBMS,项目名称:glycresoft_sqlalchemy,代码行数:8,代码来源:glycomics.py


示例16: _weighted_neutral_mass_query

    def _weighted_neutral_mass_query(self, session):
        anode = alias(ChromatogramTreeNode.__table__)
        bnode = alias(ChromatogramTreeNode.__table__)
        apeak = alias(DeconvolutedPeak.__table__)

        peak_join = apeak.join(
            ChromatogramTreeNodeToDeconvolutedPeak,
            ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)

        root_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)

        branch_peaks_join = peak_join.join(
            anode,
            ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
            ChromatogramTreeNodeBranch,
            ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
            bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
            ChromatogramToChromatogramTreeNode,
            ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)

        branch_intensities = select([apeak.c.intensity, apeak.c.neutral_mass, anode.c.node_type_id]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(branch_peaks_join)

        root_intensities = select([apeak.c.intensity, apeak.c.neutral_mass, anode.c.node_type_id]).where(
            ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
        ).select_from(root_peaks_join)

        all_intensity_mass_q = root_intensities.union_all(branch_intensities)

        all_intensity_mass = session.execute(all_intensity_mass_q).fetchall()

        arr = np.array(all_intensity_mass)
        mass = arr[:, 1]
        shift_ids = arr[:, 2].astype(int)
        distinct_shifts = set(shift_ids)
        for i in distinct_shifts:
            shift = session.query(CompoundMassShift).get(i)
            mass[shift_ids == i] -= shift.convert().mass
        intensity = arr[:, 0]
        return mass.dot(intensity) / intensity.sum()
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:45,代码来源:chromatogram.py


示例17: _build_median_query

    def _build_median_query(self, median_id_table, median_table):
        """
        SELECT tu.user_name, (tu.value + tl.value) / 2.0 as value
        FROM temp_median_ids
        LEFT JOIN temp_median tu ON tu.id = temp_median_ids.upper
        LEFT JOIN temp_median tl ON tl.id = temp_median_ids.lower;
        """
        t_upper = alias(median_table, name="tup")
        t_lower = alias(median_table, name="tlo")

        final_query = select(from_obj=median_id_table)
        for group in self.group_by:
            final_query.append_column(t_upper.c[group])

        final_query.append_column(((t_upper.c[self.VAL_COL] + t_lower.c[self.VAL_COL]) / 2.0).label(self.alias))
        final_query.append_whereclause(median_id_table.c["upper"] == t_upper.c[self.ID_COL])
        final_query.append_whereclause(median_id_table.c["lower"] == t_lower.c[self.ID_COL])
        return final_query
开发者ID:cumtjie,项目名称:commcarehq-venv,代码行数:18,代码来源:median.py


示例18: test_date

def test_date(session):
    dates = (
        date(2016, 1, 1),
        date(2016, 1, 2),
    )
    selects = tuple(sa.select((MakeADate(d),)) for d in dates)
    data = sa.alias(sa.union(*selects, use_labels=True), 'dates')
    stmt = sa.select((data,))
    result = session.execute(stmt).fetchall()
    assert tuple(chain.from_iterable(result)) == dates
开发者ID:purpleP,项目名称:sqlalchemy-utils,代码行数:10,代码来源:test_compilers.py


示例19: latest_prices_by_codes

def latest_prices_by_codes(codes=[]):
    p1 = models.Price
    p2 = sql.alias(models.Price)
    with models.session_scope() as s:
        query = s.query(p1).outerjoin(p2, sql.and_(
            p1.quandl_code == p2.c.quandl_code,
            p1.date < p2.c.date,
        )).filter(
            p1.quandl_code.in_(codes) if codes else True,
            p2.c.date.is_(None),
        )
        df = pd.read_sql(query.statement, query.session.bind, index_col="quandl_code")
    return df
开发者ID:her0e1c1,项目名称:pystock,代码行数:13,代码来源:query.py


示例20: get_end_tx_column_query

def get_end_tx_column_query(
    table,
    end_tx_column_name='end_transaction_id',
    tx_column_name='transaction_id'
):

    v1 = sa.alias(table, name='v')
    v2 = sa.alias(table, name='v2')
    v3 = sa.alias(table, name='v3')

    primary_keys = [c.name for c in table.c if c.primary_key]

    tx_criterion = sa.select(
        [sa.func.min(getattr(v3.c, tx_column_name))]
    ).where(
        sa.and_(
            getattr(v3.c, tx_column_name) > getattr(v1.c, tx_column_name),
            *[
                getattr(v3.c, pk) == getattr(v1.c, pk)
                for pk in primary_keys
                if pk != tx_column_name
            ]
        )
    )
    return sa.select(
        columns=[
            getattr(v1.c, column)
            for column in primary_keys
        ] + [
            getattr(v2.c, tx_column_name).label(end_tx_column_name)
        ],
        from_obj=v1.outerjoin(
            v2,
            sa.and_(
                getattr(v2.c, tx_column_name) ==
                tx_criterion
            )
        )
    ).order_by(getattr(v1.c, tx_column_name))
开发者ID:adamchainz,项目名称:sqlalchemy-continuum,代码行数:39,代码来源:schema.py



注:本文中的sqlalchemy.alias函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.and_函数代码示例发布时间:2022-05-27
下一篇:
Python sqlahelper.get_session函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap