本文整理汇总了Python中sqlalchemy.util.column_set函数的典型用法代码示例。如果您正苦于以下问题:Python column_set函数的具体用法?Python column_set怎么用?Python column_set使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了column_set函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_reduce_aliased_join
def test_reduce_aliased_join(self):
metadata = MetaData()
people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
Column('type', String(30)))
engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('engineer_name', String(50)),
Column('primary_language', String(50)),
)
managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
)
pjoin = people.outerjoin(engineers).outerjoin(managers).select(use_labels=True).alias('pjoin')
eq_(
util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
util.column_set([pjoin.c.people_person_id])
)
开发者ID:clones,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py
示例2: test_reduce_aliased_union_2
def test_reduce_aliased_union_2(self):
metadata = MetaData()
page_table = Table('page', metadata,
Column('id', Integer, primary_key=True),
)
magazine_page_table = Table('magazine_page', metadata,
Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
)
classified_page_table = Table('classified_page', metadata,
Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
)
# this is essentially the union formed by the ORM's polymorphic_union function.
# we define two versions with different ordering of selects.
# the first selectable has the "real" column classified_page.magazine_page_id
pjoin = union(
select([
page_table.c.id,
magazine_page_table.c.page_id,
classified_page_table.c.magazine_page_id
]).select_from(page_table.join(magazine_page_table).join(classified_page_table)),
select([
page_table.c.id,
magazine_page_table.c.page_id,
cast(null(), Integer).label('magazine_page_id')
]).select_from(page_table.join(magazine_page_table)),
).alias('pjoin')
eq_(
util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
util.column_set([pjoin.c.id])
)
# the first selectable has a CAST, which is a placeholder for
# classified_page.magazine_page_id in the second selectable. reduce_columns
# needs to take into account all foreign keys derived from pjoin.c.magazine_page_id.
# the UNION construct currently makes the external column look like that of the first
# selectable only.
pjoin = union(
select([
page_table.c.id,
magazine_page_table.c.page_id,
cast(null(), Integer).label('magazine_page_id')
]).select_from(page_table.join(magazine_page_table)),
select([
page_table.c.id,
magazine_page_table.c.page_id,
classified_page_table.c.magazine_page_id
]).select_from(page_table.join(magazine_page_table).join(classified_page_table))
).alias('pjoin')
eq_(
util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
util.column_set([pjoin.c.id])
)
开发者ID:gajop,项目名称:springgrid,代码行数:60,代码来源:test_selectable.py
示例3: _configure_pks
def _configure_pks(self):
self.tables = sql_util.find_tables(self.mapped_table)
self._pks_by_table = {}
self._cols_by_table = {}
all_cols = sqlalchemy_util.column_set(chain(*[col.proxy_set for col in self._columntoproperty]))
pk_cols = sqlalchemy_util.column_set(c for c in all_cols if c.primary_key)
# identify primary key columns which are also mapped by this mapper.
tables = set(self.tables + [self.mapped_table])
self._all_tables.update(tables)
self._cols_by_table[self.mapped_table] = all_cols
primary_key = [c for c in all_cols if c.name in self._primary_key_argument]
self._pks_by_table[self.mapped_table] = primary_key
self.primary_key = tuple(primary_key)
self._log("Identified primary key columns: %s", primary_key)
# determine cols that aren't expressed within our tables; mark these
# as "read only" properties which are refreshed upon INSERT/UPDATE
self._readonly_props = set(
self._columntoproperty[col]
for col in self._columntoproperty
if self._columntoproperty[col] not in self._identity_key_props
and (not hasattr(col, "table") or col.table not in self._cols_by_table)
)
开发者ID:rockychen-dpaw,项目名称:oim-cms,代码行数:27,代码来源:views.py
示例4: test_reduce_selectable
def test_reduce_selectable(self):
metadata = MetaData()
engineers = Table('engineers', metadata, Column('engineer_id',
Integer, primary_key=True),
Column('engineer_name', String(50)))
managers = Table('managers', metadata, Column('manager_id',
Integer, primary_key=True),
Column('manager_name', String(50)))
s = select([engineers,
managers]).where(engineers.c.engineer_name
== managers.c.manager_name)
eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
util.column_set([s.c.engineer_id, s.c.engineer_name,
s.c.manager_id]))
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:14,代码来源:test_selectable.py
示例5: replacement_traverse
def replacement_traverse(obj, opts, replace):
"""clone the given expression structure, allowing element
replacement by a given replacement function."""
cloned = util.column_dict()
stop_on = util.column_set(opts.get('stop_on', []))
def clone(elem, **kw):
if elem in stop_on or \
'no_replacement_traverse' in elem._annotations:
return elem
else:
newelem = replace(elem)
if newelem is not None:
stop_on.add(newelem)
return newelem
else:
if elem not in cloned:
cloned[elem] = newelem = elem._clone()
newelem._copy_internals(clone=clone, **kw)
return cloned[elem]
if obj is not None:
obj = clone(obj, **opts)
return obj
开发者ID:EchelonFour,项目名称:CouchPotatoServer,代码行数:25,代码来源:visitors.py
示例6: replacement_traverse
def replacement_traverse(obj, opts, replace):
"""clone the given expression structure, allowing element replacement by a given replacement function."""
cloned = util.column_dict()
stop_on = util.column_set(opts.get('stop_on', []))
def clone(element):
newelem = replace(element)
if newelem is not None:
stop_on.add(newelem)
return newelem
if element not in cloned:
cloned[element] = element._clone()
return cloned[element]
obj = clone(obj)
stack = [obj]
while stack:
t = stack.pop()
if t in stop_on:
continue
t._copy_internals(clone=clone)
for c in t.get_children(**opts):
stack.append(c)
return obj
开发者ID:BwRy,项目名称:rcs-db-ext,代码行数:26,代码来源:visitors.py
示例7: find_columns
def find_columns(clause):
"""locate Column objects within the given expression."""
cols = util.column_set()
def visit_column(col):
cols.add(col)
visitors.traverse(clause, {}, {'column':visit_column})
return cols
开发者ID:greghaynes,项目名称:xsbs,代码行数:8,代码来源:util.py
示例8: test_reduce
def test_reduce(self):
meta = MetaData()
t1 = Table('t1', meta,
Column('t1id', Integer, primary_key=True),
Column('t1data', String(30)))
t2 = Table('t2', meta,
Column('t2id', Integer, ForeignKey('t1.t1id'), primary_key=True),
Column('t2data', String(30)))
t3 = Table('t3', meta,
Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
Column('t3data', String(30)))
eq_(
util.column_set(sql_util.reduce_columns([t1.c.t1id, t1.c.t1data, t2.c.t2id, t2.c.t2data, t3.c.t3id, t3.c.t3data])),
util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])
)
开发者ID:clones,项目名称:sqlalchemy,代码行数:17,代码来源:test_selectable.py
示例9: visit_binary
def visit_binary(binary):
if binary.operator == operators.eq:
cols = util.column_set(chain(*[c.proxy_set for c in columns.difference(omit)]))
if binary.left in cols and binary.right in cols:
for c in columns:
if c.shares_lineage(binary.right):
omit.add(c)
break
开发者ID:Kellel,项目名称:items,代码行数:8,代码来源:util.py
示例10: test_onclause_direction
def test_onclause_direction(self):
metadata = MetaData()
employee = Table( 'Employee', metadata,
Column('name', String(100)),
Column('id', Integer, primary_key= True),
)
engineer = Table('Engineer', metadata,
Column('id', Integer,
ForeignKey('Employee.id'), primary_key=True))
eq_(util.column_set(employee.join(engineer, employee.c.id
== engineer.c.id).primary_key),
util.column_set([employee.c.id]))
eq_(util.column_set(employee.join(engineer, engineer.c.id
== employee.c.id).primary_key),
util.column_set([employee.c.id]))
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:19,代码来源:test_selectable.py
示例11: reduce_columns
def reduce_columns(columns, *clauses, **kw):
"""given a list of columns, return a 'reduced' set based on natural equivalents.
the set is reduced to the smallest list of columns which have no natural
equivalent present in the list. A "natural equivalent" means that two columns
will ultimately represent the same value because they are related by a foreign key.
\*clauses is an optional list of join clauses which will be traversed
to further identify columns that are "equivalent".
\**kw may specify 'ignore_nonexistent_tables' to ignore foreign keys
whose tables are not yet configured.
This function is primarily used to determine the most minimal "primary key"
from a selectable, by reducing the set of primary key columns present
in the the selectable to just those that are not repeated.
"""
ignore_nonexistent_tables = kw.pop("ignore_nonexistent_tables", False)
columns = util.ordered_column_set(columns)
omit = util.column_set()
for col in columns:
for fk in chain(*[c.foreign_keys for c in col.proxy_set]):
for c in columns:
if c is col:
continue
try:
fk_col = fk.column
except exc.NoReferencedTableError:
if ignore_nonexistent_tables:
continue
else:
raise
if fk_col.shares_lineage(c):
omit.add(col)
break
if clauses:
def visit_binary(binary):
if binary.operator == operators.eq:
cols = util.column_set(chain(*[c.proxy_set for c in columns.difference(omit)]))
if binary.left in cols and binary.right in cols:
for c in columns:
if c.shares_lineage(binary.right):
omit.add(c)
break
for clause in clauses:
visitors.traverse(clause, {}, {"binary": visit_binary})
return expression.ColumnSet(columns.difference(omit))
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:54,代码来源:util.py
示例12: test_reduce_aliased_union
def test_reduce_aliased_union(self):
metadata = MetaData()
item_table = Table(
'item', metadata,
Column('id', Integer, ForeignKey('base_item.id'), primary_key=True),
Column('dummy', Integer, default=0))
base_item_table = Table(
'base_item', metadata,
Column('id', Integer, primary_key=True),
Column('child_name', String(255), default=None))
from sqlalchemy.orm.util import polymorphic_union
item_join = polymorphic_union( {
'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'),
'Item':base_item_table.join(item_table),
}, None, 'item_join')
eq_(
util.column_set(sql_util.reduce_columns([item_join.c.id, item_join.c.dummy, item_join.c.child_name])),
util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])
)
开发者ID:clones,项目名称:sqlalchemy,代码行数:23,代码来源:test_selectable.py
示例13: unwrap_order_by
def unwrap_order_by(clause):
"""Break up an 'order by' expression into individual column-expressions,
without DESC/ASC/NULLS FIRST/NULLS LAST"""
cols = util.column_set()
stack = deque([clause])
while stack:
t = stack.popleft()
if isinstance(t, expression.ColumnElement) and (
not isinstance(t, expression._UnaryExpression) or not operators.is_ordering_modifier(t.modifier)
):
cols.add(t)
else:
for c in t.get_children():
stack.append(c)
return cols
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:16,代码来源:util.py
示例14: cloned_traverse
def cloned_traverse(obj, opts, visitors):
"""clone the given expression structure, allowing
modifications by visitors."""
cloned = util.column_dict()
stop_on = util.column_set(opts.get('stop_on', []))
def clone(elem):
if elem in stop_on:
return elem
else:
if elem not in cloned:
cloned[elem] = newelem = elem._clone()
newelem._copy_internals(clone=clone)
meth = visitors.get(newelem.__visit_name__, None)
if meth:
meth(newelem)
return cloned[elem]
if obj is not None:
obj = clone(obj)
return obj
开发者ID:EchelonFour,项目名称:CouchPotatoServer,代码行数:22,代码来源:visitors.py
示例15: _determine_targets
def _determine_targets(self):
if isinstance(self.argument, type):
self.mapper = mapper.class_mapper(self.argument, compile=False)
elif isinstance(self.argument, mapper.Mapper):
self.mapper = self.argument
elif util.callable(self.argument):
# accept a callable to suit various deferred-configurational schemes
self.mapper = mapper.class_mapper(self.argument(), compile=False)
else:
raise sa_exc.ArgumentError(
"relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument))
)
assert isinstance(self.mapper, mapper.Mapper), self.mapper
# accept callables for other attributes which may require deferred initialization
for attr in ("order_by", "primaryjoin", "secondaryjoin", "secondary", "_foreign_keys", "remote_side"):
if util.callable(getattr(self, attr)):
setattr(self, attr, getattr(self, attr)())
# in the case that InstrumentedAttributes were used to construct
# primaryjoin or secondaryjoin, remove the "_orm_adapt" annotation so these
# interact with Query in the same way as the original Table-bound Column objects
for attr in ("primaryjoin", "secondaryjoin"):
val = getattr(self, attr)
if val is not None:
util.assert_arg_type(val, sql.ClauseElement, attr)
setattr(self, attr, _orm_deannotate(val))
if self.order_by:
self.order_by = [expression._literal_as_column(x) for x in util.to_list(self.order_by)]
self._foreign_keys = util.column_set(
expression._literal_as_column(x) for x in util.to_column_set(self._foreign_keys)
)
self.remote_side = util.column_set(
expression._literal_as_column(x) for x in util.to_column_set(self.remote_side)
)
if not self.parent.concrete:
for inheriting in self.parent.iterate_to_root():
if inheriting is not self.parent and inheriting._get_property(self.key, raiseerr=False):
util.warn(
(
"Warning: relation '%s' on mapper '%s' supercedes "
"the same relation on inherited mapper '%s'; this "
"can cause dependency issues during flush"
)
% (self.key, self.parent, inheriting)
)
# TODO: remove 'self.table'
self.target = self.table = self.mapper.mapped_table
if self.cascade.delete_orphan:
if self.parent.class_ is self.mapper.class_:
raise sa_exc.ArgumentError(
"In relationship '%s', can't establish 'delete-orphan' cascade "
"rule on a self-referential relationship. "
"You probably want cascade='all', which includes delete cascading but not orphan detection."
% (str(self))
)
self.mapper.primary_mapper().delete_orphans.append((self.key, self.parent.class_))
开发者ID:AntonNguyen,项目名称:easy_api,代码行数:62,代码来源:properties.py
示例16: find_columns
def find_columns(clause):
"""locate Column objects within the given expression."""
cols = util.column_set()
visitors.traverse(clause, {}, {"column": cols.add})
return cols
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:6,代码来源:util.py
示例17: _determine_synchronize_pairs
def _determine_synchronize_pairs(self):
if self.local_remote_pairs:
if not self._foreign_keys:
raise sa_exc.ArgumentError("foreign_keys argument is required with _local_remote_pairs argument")
self.synchronize_pairs = []
for l, r in self.local_remote_pairs:
if r in self._foreign_keys:
self.synchronize_pairs.append((l, r))
elif l in self._foreign_keys:
self.synchronize_pairs.append((r, l))
else:
eq_pairs = criterion_as_pairs(
self.primaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly
)
eq_pairs = [
(l, r)
for l, r in eq_pairs
if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r))
or self.viewonly
and r in self._foreign_keys
]
if not eq_pairs:
if not self.viewonly and criterion_as_pairs(
self.primaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True
):
raise sa_exc.ArgumentError(
"Could not locate any equated, locally "
"mapped column pairs for primaryjoin condition '%s' on relation %s. "
"For more relaxed rules on join conditions, the relation may be "
"marked as viewonly=True." % (self.primaryjoin, self)
)
else:
if self._foreign_keys:
raise sa_exc.ArgumentError(
"Could not determine relation direction for "
"primaryjoin condition '%s', on relation %s. "
"Are the columns in 'foreign_keys' present within the given "
"join condition ?" % (self.primaryjoin, self)
)
else:
raise sa_exc.ArgumentError(
"Could not determine relation direction for "
"primaryjoin condition '%s', on relation %s. "
"Specify the 'foreign_keys' argument to indicate which columns "
"on the relation are foreign." % (self.primaryjoin, self)
)
self.synchronize_pairs = eq_pairs
if self.secondaryjoin:
sq_pairs = criterion_as_pairs(
self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly
)
sq_pairs = [
(l, r)
for l, r in sq_pairs
if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r)) or r in self._foreign_keys
]
if not sq_pairs:
if not self.viewonly and criterion_as_pairs(
self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True
):
raise sa_exc.ArgumentError(
"Could not locate any equated, locally mapped "
"column pairs for secondaryjoin condition '%s' on relation %s. "
"For more relaxed rules on join conditions, the "
"relation may be marked as viewonly=True." % (self.secondaryjoin, self)
)
else:
raise sa_exc.ArgumentError(
"Could not determine relation direction "
"for secondaryjoin condition '%s', on relation %s. "
"Specify the foreign_keys argument to indicate which "
"columns on the relation are foreign." % (self.secondaryjoin, self)
)
self.secondary_synchronize_pairs = sq_pairs
else:
self.secondary_synchronize_pairs = None
self._foreign_keys = util.column_set(r for l, r in self.synchronize_pairs)
if self.secondary_synchronize_pairs:
self._foreign_keys.update(r for l, r in self.secondary_synchronize_pairs)
开发者ID:AntonNguyen,项目名称:easy_api,代码行数:88,代码来源:properties.py
注:本文中的sqlalchemy.util.column_set函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论