本文整理汇总了Python中sqlalchemy.alias函数的典型用法代码示例。如果您正苦于以下问题:Python alias函数的具体用法?Python alias怎么用?Python alias使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了alias函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_property_mod_flags_query
def get_property_mod_flags_query(
table,
tracked_columns,
mod_suffix='_mod',
end_tx_column_name='end_transaction_id',
tx_column_name='transaction_id',
):
v1 = sa.alias(table, name='v')
v2 = sa.alias(table, name='v2')
primary_keys = [c.name for c in table.c if c.primary_key]
return sa.select(
columns=[
getattr(v1.c, column)
for column in primary_keys
] + [
(sa.or_(
getattr(v1.c, column) != getattr(v2.c, column),
getattr(v2.c, tx_column_name).is_(None)
)).label(column + mod_suffix)
for column in tracked_columns
],
from_obj=v1.outerjoin(
v2,
sa.and_(
getattr(v2.c, end_tx_column_name) ==
getattr(v1.c, tx_column_name),
*[
getattr(v2.c, pk) == getattr(v1.c, pk)
for pk in primary_keys
if pk != tx_column_name
]
)
)
).order_by(getattr(v1.c, tx_column_name))
开发者ID:adamchainz,项目名称:sqlalchemy-continuum,代码行数:35,代码来源:schema.py
示例2: _peaks_query
def _peaks_query(self, session):
anode = alias(ChromatogramTreeNode.__table__)
bnode = alias(ChromatogramTreeNode.__table__)
apeak = alias(DeconvolutedPeak.__table__)
peak_join = apeak.join(
ChromatogramTreeNodeToDeconvolutedPeak,
ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)
root_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)
branch_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramTreeNodeBranch,
ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)
branch_ids = select([apeak.c.id]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(branch_peaks_join)
root_ids = select([apeak.c.id]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(root_peaks_join)
all_ids = root_ids.union_all(branch_ids)
peaks = session.execute(all_ids).fetchall()
return {p[0] for p in peaks}
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:35,代码来源:chromatogram.py
示例3: test_double
def test_double(self):
"""tests lazy loading with two relationships simulatneously, from the same table, using aliases. """
users, orders, User, Address, Order, addresses = (self.tables.users,
self.tables.orders,
self.classes.User,
self.classes.Address,
self.classes.Order,
self.tables.addresses)
openorders = sa.alias(orders, 'openorders')
closedorders = sa.alias(orders, 'closedorders')
mapper(Address, addresses)
mapper(Order, orders)
open_mapper = mapper(Order, openorders, non_primary=True)
closed_mapper = mapper(Order, closedorders, non_primary=True)
mapper(User, users, properties = dict(
addresses = relationship(Address, lazy = True),
open_orders = relationship(open_mapper, primaryjoin = sa.and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy='select'),
closed_orders = relationship(closed_mapper, primaryjoin = sa.and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy='select')
))
q = create_session().query(User)
assert [
User(
id=7,
addresses=[Address(id=1)],
open_orders = [Order(id=3)],
closed_orders = [Order(id=1), Order(id=5)]
),
User(
id=8,
addresses=[Address(id=2), Address(id=3), Address(id=4)],
open_orders = [],
closed_orders = []
),
User(
id=9,
addresses=[Address(id=5)],
open_orders = [Order(id=4)],
closed_orders = [Order(id=2)]
),
User(id=10)
] == q.all()
sess = create_session()
user = sess.query(User).get(7)
assert [Order(id=1), Order(id=5)] == create_session().query(closed_mapper).with_parent(user, property='closed_orders').all()
assert [Order(id=3)] == create_session().query(open_mapper).with_parent(user, property='open_orders').all()
开发者ID:MVReddy,项目名称:sqlalchemy,代码行数:54,代码来源:test_lazy_relations.py
示例4: _as_array_query
def _as_array_query(self, session):
anode = alias(ChromatogramTreeNode.__table__)
bnode = alias(ChromatogramTreeNode.__table__)
apeak = alias(DeconvolutedPeak.__table__)
peak_join = apeak.join(
ChromatogramTreeNodeToDeconvolutedPeak,
ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)
root_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)
branch_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramTreeNodeBranch,
ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)
branch_intensities = select([apeak.c.intensity, anode.c.retention_time]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(branch_peaks_join)
root_intensities = select([apeak.c.intensity, anode.c.retention_time]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(root_peaks_join)
all_intensities_q = root_intensities.union_all(branch_intensities).order_by(
anode.c.retention_time)
all_intensities = session.execute(all_intensities_q).fetchall()
time = []
signal = []
current_signal = all_intensities[0][0]
current_time = all_intensities[0][1]
for intensity, rt in all_intensities[1:]:
if abs(current_time - rt) < 1e-4:
current_signal += intensity
else:
time.append(current_time)
signal.append(current_signal)
current_time = rt
current_signal = intensity
time.append(current_time)
signal.append(current_signal)
return np.array(time), np.array(signal)
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:53,代码来源:chromatogram.py
示例5: test_double
def test_double(self):
"""Eager loading with two relationships simultaneously,
from the same table, using aliases."""
openorders = sa.alias(orders, 'openorders')
closedorders = sa.alias(orders, 'closedorders')
mapper(Address, addresses)
mapper(Order, orders)
open_mapper = mapper(Order, openorders, non_primary=True)
closed_mapper = mapper(Order, closedorders, non_primary=True)
mapper(User, users, properties = dict(
addresses = relationship(Address, lazy='subquery',
order_by=addresses.c.id),
open_orders = relationship(
open_mapper,
primaryjoin=sa.and_(openorders.c.isopen == 1,
users.c.id==openorders.c.user_id),
lazy='subquery', order_by=openorders.c.id),
closed_orders = relationship(
closed_mapper,
primaryjoin=sa.and_(closedorders.c.isopen == 0,
users.c.id==closedorders.c.user_id),
lazy='subquery', order_by=closedorders.c.id)))
q = create_session().query(User).order_by(User.id)
def go():
eq_([
User(
id=7,
addresses=[Address(id=1)],
open_orders = [Order(id=3)],
closed_orders = [Order(id=1), Order(id=5)]
),
User(
id=8,
addresses=[Address(id=2), Address(id=3), Address(id=4)],
open_orders = [],
closed_orders = []
),
User(
id=9,
addresses=[Address(id=5)],
open_orders = [Order(id=4)],
closed_orders = [Order(id=2)]
),
User(id=10)
], q.all())
self.assert_sql_count(testing.db, go, 4)
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:53,代码来源:test_subquery_relations.py
示例6: execute
def execute(self, metadata, connection, filter_values):
asha_table = self.get_asha_table(metadata)
max_date_query = sqlalchemy.select([
sqlalchemy.func.max(asha_table.c.date).label('date'),
asha_table.c.case_id.label('case_id')
])
if self.filters:
for filter in self.filters:
max_date_query.append_whereclause(filter.build_expression())
max_date_query.append_group_by(
asha_table.c.case_id
)
max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')
checklist_query = sqlalchemy.select()
for column in self.columns:
checklist_query.append_column(column.build_column(asha_table))
checklist_query = checklist_query.where(
asha_table.c.case_id == max_date_subquery.c.case_id
).where(
asha_table.c.date == max_date_subquery.c.date
)
return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:johan--,项目名称:commcare-hq,代码行数:29,代码来源:sql_data.py
示例7: apply_default_value
def apply_default_value(self, column):
if column.default:
execute = self.table.migration.conn.execute
val = column.default.arg
table = self.table.migration.metadata.tables[self.table.name]
table.append_column(column)
cname = getattr(table.c, column.name)
if column.default.is_callable:
table2 = alias(select([table]).limit(1).where(cname.is_(None)))
Table = self.table.migration.metadata.tables['system_model']
Column = self.table.migration.metadata.tables['system_column']
j1 = join(Table, Column, Table.c.name == Column.c.model)
query = select([func.count()]).select_from(table)
nb_row = self.table.migration.conn.execute(query).fetchone()[0]
query = select([Column.c.name]).select_from(j1)
query = query.where(Column.c.primary_key.is_(True))
query = query.where(Table.c.table == self.table.name)
columns = [x[0] for x in execute(query).fetchall()]
where = and_(*[getattr(table.c, x) == getattr(table2.c, x)
for x in columns])
for offset in range(nb_row):
# call for each row because the default value
# could be a sequence or depend of other field
query = update(table).where(where).values(
{cname: val(None)})
execute(query)
else:
query = update(table).where(cname.is_(None)).values(
{cname: val})
execute(query)
开发者ID:jssuzanne,项目名称:AnyBlok,代码行数:31,代码来源:migration.py
示例8: execute
def execute(self, metadata, connection, filter_values):
try:
table = metadata.tables[self.table_name]
except KeyError:
raise TableNotFoundException("Unable to query table, table not found: %s" % self.table_name)
asha_table = self.get_asha_table(metadata)
max_date_query = sqlalchemy.select([
sqlalchemy.func.max(asha_table.c.date).label('date'),
asha_table.c.case_id.label('case_id')
])
if self.filters:
for filter in self.filters:
max_date_query.append_whereclause(filter.build_expression(table))
max_date_query.append_group_by(
asha_table.c.case_id
)
max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')
checklist_query = sqlalchemy.select()
for column in self.columns:
checklist_query.append_column(column.build_column(asha_table))
checklist_query = checklist_query.where(
asha_table.c.case_id == max_date_subquery.c.case_id
).where(
asha_table.c.date == max_date_subquery.c.date
)
return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:tlwakwella,项目名称:commcare-hq,代码行数:34,代码来源:sql_data.py
示例9: filter_ancestors
def filter_ancestors(self, and_self=False):
"The same as :meth:`filter_descendants` but filters ancestor nodes."
options = self._tree_options
obj = self._get_obj()
#self._get_session_and_assert_flushed(obj)
# Restrict ourselves to just those nodes within the same tree:
tree_id = getattr(obj, self.tree_id_field.name)
filter_ = self.tree_id_field == tree_id
alias = sqlalchemy.alias(options.table)
left_field = self.left_field
filter_ &= sqlalchemy.between(
getattr(alias.c, self.left_field.name),
self.left_field, self.right_field)
filter_ &= getattr(alias.c, self.pk_field.name) == \
getattr(obj, self.pk_field.name)
if not and_self:
filter_ &= self.pk_field != getattr(obj, self.pk_field.name)
# WHERE tree_id = <node.tree_id> AND <node.path> LIKE path || '%'
#filter_ = (self.tree_id_field == tree_id) \
# & sqlalchemy.sql.expression.literal(
# path, sqlalchemy.String
# ).like(options.path_field + '%')
#if and_self:
# filter_ &= self.depth_field <= depth
#else:
# filter_ &= self.depth_field < depth
return filter_
开发者ID:adurieux,项目名称:sqlalchemy-orm-tree,代码行数:32,代码来源:instance.py
示例10: find
def find(self, _limit=None, _offset=0, _step=5000,
order_by='id', **_filter):
"""
Performs a simple search on the table. Simply pass keyword arguments as ``filter``.
::
results = table.find(country='France')
results = table.find(country='France', year=1980)
Using ``_limit``::
# just return the first 10 rows
results = table.find(country='France', _limit=10)
You can sort the results by single or multiple columns. Append a minus sign
to the column name for descending order::
# sort results by a column 'year'
results = table.find(country='France', order_by='year')
# return all rows sorted by multiple columns (by year in descending order)
results = table.find(order_by=['country', '-year'])
By default :py:meth:`find() <dataset.Table.find>` will break the
query into chunks of ``_step`` rows to prevent huge tables
from being loaded into memory at once.
For more complex queries, please use :py:meth:`db.query() <dataset.Database.query>`
instead."""
self._check_dropped()
if not isinstance(order_by, (list, tuple)):
order_by = [order_by]
order_by = [o for o in order_by if o in self.table.columns]
order_by = [self._args_to_order_by(o) for o in order_by]
args = self._args_to_clause(_filter)
# query total number of rows first
count_query = alias(self.table.select(whereclause=args, limit=_limit, offset=_offset), name='count_query_alias').count()
rp = self.database.executable.execute(count_query)
total_row_count = rp.fetchone()[0]
if _step is None or _step is False or _step == 0:
_step = total_row_count
if total_row_count > _step and not order_by:
_step = total_row_count
log.warn("query cannot be broken into smaller sections because it is unordered")
queries = []
for i in count():
qoffset = _offset + (_step * i)
qlimit = _step
if _limit is not None:
qlimit = min(_limit - (_step * i), _step)
if qlimit <= 0:
break
queries.append(self.table.select(whereclause=args, limit=qlimit,
offset=qoffset, order_by=order_by))
return ResultIter((self.database.executable.execute(q) for q in queries))
开发者ID:adityaU,项目名称:dataset,代码行数:60,代码来源:table.py
示例11: execute
def execute(self, connection, filter_values):
max_date_query = sqlalchemy.select([
sqlalchemy.func.max(sqlalchemy.column('completed_on')).label('completed_on'),
sqlalchemy.column('case_id').label('case_id')
]).select_from(sqlalchemy.table(self.table_name))
if self.filters:
for filter in self.filters:
max_date_query.append_whereclause(filter.build_expression())
max_date_query.append_group_by(
sqlalchemy.column('case_id')
)
max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')
asha_table = self.get_asha_table_name()
checklist_query = sqlalchemy.select()
for column in self.columns:
checklist_query.append_column(column.build_column())
checklist_query = checklist_query.where(
sqlalchemy.literal_column('"{}".case_id'.format(asha_table)) == max_date_subquery.c.case_id
).where(
sqlalchemy.literal_column('"{}".completed_on'.format(asha_table)) == max_date_subquery.c.completed_on
).select_from(sqlalchemy.table(asha_table))
return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:28,代码来源:sql_data.py
示例12: compute_distance
def compute_distance(session, table, monosaccharide_names, model=GlycopeptideMatch):
distance_table = make_distance_table(model)
distance_table.create(session.connection())
from_entity = alias(table)
to_entity = alias(table)
distances = [getattr(from_entity.c, name) - getattr(to_entity.c, name) for name in monosaccharide_names]
selected = [from_entity.c.id, to_entity.c.id] + distances
q = session.query(*selected).join(to_entity, from_entity.c.id != to_entity.c.id).yield_per(1000)
for fields in q:
from_id, to_id = fields[:2]
distance = sum(fields[2:])
# print from_id, to_id, distance
session.execute(distance_table.insert(), [{'from_id': from_id, "to_id": to_id, "distance": distance}])
session.commit()
return distance_table
开发者ID:BostonUniversityCBMS,项目名称:glycresoft_sqlalchemy,代码行数:16,代码来源:glycan_composition_distance.py
示例13: test_delete_stmt_with_comma_subquery_alias_join
def test_delete_stmt_with_comma_subquery_alias_join():
parent_ = sa.alias(product)
del_stmt = (
sa.delete(items)
.where(items.c.order_id == orders.c.id)
.where(orders.c.customer_id.in_(sa.select([customers.c.id]).where(customers.c.email.endswith("test.com"))))
.where(items.c.product_id == product.c.id)
.where(product.c.parent_id == parent_.c.id)
.where(parent_.c.id != hammy_spam.c.ham_id)
)
expected = """
DELETE FROM items
USING orders, products, products AS products_1, "ham, spam"
WHERE items.order_id = orders.id
AND orders.customer_id IN
(SELECT customers.id
FROM customers
WHERE (customers.email LIKE '%%' || 'test.com'))
AND items.product_id = products.id
AND products.parent_id = products_1.id
AND products_1.id != "ham, spam".ham_id"""
assert clean(compile_query(del_stmt)) == clean(expected)
开发者ID:solackerman,项目名称:sqlalchemy-redshift,代码行数:25,代码来源:test_delete_stmt.py
示例14: test_delete_stmt_on_alias
def test_delete_stmt_on_alias():
parent_ = sa.alias(product)
del_stmt = sa.delete(product).where(product.c.parent_id == parent_.c.id)
expected = """
DELETE FROM products
USING products AS products_1
WHERE products.parent_id = products_1.id"""
assert clean(compile_query(del_stmt)) == clean(expected)
开发者ID:solackerman,项目名称:sqlalchemy-redshift,代码行数:8,代码来源:test_delete_stmt.py
示例15: qmonosaccharide
def qmonosaccharide(cls, monosaccharide_name):
if monosaccharide_name in cls._qmonosaccharide_cache:
return cls._qmonosaccharide_cache[monosaccharide_name]
symbol = alias(cls.GlycanCompositionAssociation.__table__.select().where(
cls.GlycanCompositionAssociation.__table__.c.base_type == monosaccharide_name),
monosaccharide_name)
cls._qmonosaccharide_cache[monosaccharide_name] = symbol
return symbol
开发者ID:BostonUniversityCBMS,项目名称:glycresoft_sqlalchemy,代码行数:8,代码来源:glycomics.py
示例16: _weighted_neutral_mass_query
def _weighted_neutral_mass_query(self, session):
anode = alias(ChromatogramTreeNode.__table__)
bnode = alias(ChromatogramTreeNode.__table__)
apeak = alias(DeconvolutedPeak.__table__)
peak_join = apeak.join(
ChromatogramTreeNodeToDeconvolutedPeak,
ChromatogramTreeNodeToDeconvolutedPeak.c.peak_id == apeak.c.id)
root_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == anode.c.id)
branch_peaks_join = peak_join.join(
anode,
ChromatogramTreeNodeToDeconvolutedPeak.c.node_id == anode.c.id).join(
ChromatogramTreeNodeBranch,
ChromatogramTreeNodeBranch.child_id == anode.c.id).join(
bnode, ChromatogramTreeNodeBranch.parent_id == bnode.c.id).join(
ChromatogramToChromatogramTreeNode,
ChromatogramToChromatogramTreeNode.c.node_id == bnode.c.id)
branch_intensities = select([apeak.c.intensity, apeak.c.neutral_mass, anode.c.node_type_id]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(branch_peaks_join)
root_intensities = select([apeak.c.intensity, apeak.c.neutral_mass, anode.c.node_type_id]).where(
ChromatogramToChromatogramTreeNode.c.chromatogram_id == self.id
).select_from(root_peaks_join)
all_intensity_mass_q = root_intensities.union_all(branch_intensities)
all_intensity_mass = session.execute(all_intensity_mass_q).fetchall()
arr = np.array(all_intensity_mass)
mass = arr[:, 1]
shift_ids = arr[:, 2].astype(int)
distinct_shifts = set(shift_ids)
for i in distinct_shifts:
shift = session.query(CompoundMassShift).get(i)
mass[shift_ids == i] -= shift.convert().mass
intensity = arr[:, 0]
return mass.dot(intensity) / intensity.sum()
开发者ID:mobiusklein,项目名称:glycan_profiling,代码行数:45,代码来源:chromatogram.py
示例17: _build_median_query
def _build_median_query(self, median_id_table, median_table):
"""
SELECT tu.user_name, (tu.value + tl.value) / 2.0 as value
FROM temp_median_ids
LEFT JOIN temp_median tu ON tu.id = temp_median_ids.upper
LEFT JOIN temp_median tl ON tl.id = temp_median_ids.lower;
"""
t_upper = alias(median_table, name="tup")
t_lower = alias(median_table, name="tlo")
final_query = select(from_obj=median_id_table)
for group in self.group_by:
final_query.append_column(t_upper.c[group])
final_query.append_column(((t_upper.c[self.VAL_COL] + t_lower.c[self.VAL_COL]) / 2.0).label(self.alias))
final_query.append_whereclause(median_id_table.c["upper"] == t_upper.c[self.ID_COL])
final_query.append_whereclause(median_id_table.c["lower"] == t_lower.c[self.ID_COL])
return final_query
开发者ID:cumtjie,项目名称:commcarehq-venv,代码行数:18,代码来源:median.py
示例18: test_date
def test_date(session):
dates = (
date(2016, 1, 1),
date(2016, 1, 2),
)
selects = tuple(sa.select((MakeADate(d),)) for d in dates)
data = sa.alias(sa.union(*selects, use_labels=True), 'dates')
stmt = sa.select((data,))
result = session.execute(stmt).fetchall()
assert tuple(chain.from_iterable(result)) == dates
开发者ID:purpleP,项目名称:sqlalchemy-utils,代码行数:10,代码来源:test_compilers.py
示例19: latest_prices_by_codes
def latest_prices_by_codes(codes=[]):
p1 = models.Price
p2 = sql.alias(models.Price)
with models.session_scope() as s:
query = s.query(p1).outerjoin(p2, sql.and_(
p1.quandl_code == p2.c.quandl_code,
p1.date < p2.c.date,
)).filter(
p1.quandl_code.in_(codes) if codes else True,
p2.c.date.is_(None),
)
df = pd.read_sql(query.statement, query.session.bind, index_col="quandl_code")
return df
开发者ID:her0e1c1,项目名称:pystock,代码行数:13,代码来源:query.py
示例20: get_end_tx_column_query
def get_end_tx_column_query(
table,
end_tx_column_name='end_transaction_id',
tx_column_name='transaction_id'
):
v1 = sa.alias(table, name='v')
v2 = sa.alias(table, name='v2')
v3 = sa.alias(table, name='v3')
primary_keys = [c.name for c in table.c if c.primary_key]
tx_criterion = sa.select(
[sa.func.min(getattr(v3.c, tx_column_name))]
).where(
sa.and_(
getattr(v3.c, tx_column_name) > getattr(v1.c, tx_column_name),
*[
getattr(v3.c, pk) == getattr(v1.c, pk)
for pk in primary_keys
if pk != tx_column_name
]
)
)
return sa.select(
columns=[
getattr(v1.c, column)
for column in primary_keys
] + [
getattr(v2.c, tx_column_name).label(end_tx_column_name)
],
from_obj=v1.outerjoin(
v2,
sa.and_(
getattr(v2.c, tx_column_name) ==
tx_criterion
)
)
).order_by(getattr(v1.c, tx_column_name))
开发者ID:adamchainz,项目名称:sqlalchemy-continuum,代码行数:39,代码来源:schema.py
注:本文中的sqlalchemy.alias函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论