• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python util.column_set函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.util.column_set函数的典型用法代码示例。如果您正苦于以下问题:Python column_set函数的具体用法?Python column_set怎么用?Python column_set使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了column_set函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_reduce_aliased_join

    def test_reduce_aliased_join(self):
        metadata = MetaData()
        people = Table('people', metadata,
           Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
           Column('name', String(50)),
           Column('type', String(30)))

        engineers = Table('engineers', metadata,
           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
           Column('status', String(30)),
           Column('engineer_name', String(50)),
           Column('primary_language', String(50)),
          )
     
        managers = Table('managers', metadata,
           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
           Column('status', String(30)),
           Column('manager_name', String(50))
           )
        
        pjoin = people.outerjoin(engineers).outerjoin(managers).select(use_labels=True).alias('pjoin')
        eq_(
            util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
            util.column_set([pjoin.c.people_person_id])
        )
开发者ID:clones,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py


示例2: test_reduce_aliased_union_2

    def test_reduce_aliased_union_2(self):
        metadata = MetaData()

        page_table = Table('page', metadata,
            Column('id', Integer, primary_key=True),
        )
        magazine_page_table = Table('magazine_page', metadata,
            Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
        )
        classified_page_table = Table('classified_page', metadata,
            Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
        )
        
       # this is essentially the union formed by the ORM's polymorphic_union function.
        # we define two versions with different ordering of selects.

        # the first selectable has the "real" column classified_page.magazine_page_id
        pjoin = union(
            select([
                page_table.c.id, 
                magazine_page_table.c.page_id, 
                classified_page_table.c.magazine_page_id
            ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)),

            select([
                page_table.c.id, 
                magazine_page_table.c.page_id, 
                cast(null(), Integer).label('magazine_page_id')
            ]).select_from(page_table.join(magazine_page_table)),
            
        ).alias('pjoin')

        eq_(
            util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
            util.column_set([pjoin.c.id])
        )    

        # the first selectable has a CAST, which is a placeholder for
        # classified_page.magazine_page_id in the second selectable.  reduce_columns
        # needs to take into account all foreign keys derived from pjoin.c.magazine_page_id.
        # the UNION construct currently makes the external column look like that of the first
        # selectable only.
        pjoin = union(
            select([
                page_table.c.id, 
                magazine_page_table.c.page_id, 
                cast(null(), Integer).label('magazine_page_id')
            ]).select_from(page_table.join(magazine_page_table)),
            
            select([
                page_table.c.id, 
                magazine_page_table.c.page_id, 
                classified_page_table.c.magazine_page_id
            ]).select_from(page_table.join(magazine_page_table).join(classified_page_table))
        ).alias('pjoin')

        eq_(
            util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
            util.column_set([pjoin.c.id])
        )    
开发者ID:gajop,项目名称:springgrid,代码行数:60,代码来源:test_selectable.py


示例3: _configure_pks

    def _configure_pks(self):
        self.tables = sql_util.find_tables(self.mapped_table)

        self._pks_by_table = {}
        self._cols_by_table = {}

        all_cols = sqlalchemy_util.column_set(chain(*[col.proxy_set for col in self._columntoproperty]))

        pk_cols = sqlalchemy_util.column_set(c for c in all_cols if c.primary_key)
        # identify primary key columns which are also mapped by this mapper.
        tables = set(self.tables + [self.mapped_table])
        self._all_tables.update(tables)
        self._cols_by_table[self.mapped_table] = all_cols
        primary_key = [c for c in all_cols if c.name in self._primary_key_argument]
        self._pks_by_table[self.mapped_table] = primary_key

        self.primary_key = tuple(primary_key)
        self._log("Identified primary key columns: %s", primary_key)

        # determine cols that aren't expressed within our tables; mark these
        # as "read only" properties which are refreshed upon INSERT/UPDATE
        self._readonly_props = set(
            self._columntoproperty[col]
            for col in self._columntoproperty
            if self._columntoproperty[col] not in self._identity_key_props
            and (not hasattr(col, "table") or col.table not in self._cols_by_table)
        )
开发者ID:rockychen-dpaw,项目名称:oim-cms,代码行数:27,代码来源:views.py


示例4: test_reduce_selectable

 def test_reduce_selectable(self):
     metadata = MetaData()
     engineers = Table('engineers', metadata, Column('engineer_id',
                       Integer, primary_key=True),
                       Column('engineer_name', String(50)))
     managers = Table('managers', metadata, Column('manager_id',
                      Integer, primary_key=True),
                      Column('manager_name', String(50)))
     s = select([engineers,
                managers]).where(engineers.c.engineer_name
                                 == managers.c.manager_name)
     eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
         util.column_set([s.c.engineer_id, s.c.engineer_name,
         s.c.manager_id]))
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:14,代码来源:test_selectable.py


示例5: replacement_traverse

def replacement_traverse(obj, opts, replace):
    """clone the given expression structure, allowing element 
    replacement by a given replacement function."""

    cloned = util.column_dict()
    stop_on = util.column_set(opts.get('stop_on', []))

    def clone(elem, **kw):
        if elem in stop_on or \
            'no_replacement_traverse' in elem._annotations:
            return elem
        else:
            newelem = replace(elem)
            if newelem is not None:
                stop_on.add(newelem)
                return newelem
            else:
                if elem not in cloned:
                    cloned[elem] = newelem = elem._clone()
                    newelem._copy_internals(clone=clone, **kw)
                return cloned[elem]

    if obj is not None:
        obj = clone(obj, **opts)
    return obj
开发者ID:EchelonFour,项目名称:CouchPotatoServer,代码行数:25,代码来源:visitors.py


示例6: replacement_traverse

def replacement_traverse(obj, opts, replace):
    """clone the given expression structure, allowing element replacement by a given replacement function."""

    cloned = util.column_dict()
    stop_on = util.column_set(opts.get('stop_on', []))

    def clone(element):
        newelem = replace(element)
        if newelem is not None:
            stop_on.add(newelem)
            return newelem

        if element not in cloned:
            cloned[element] = element._clone()
        return cloned[element]

    obj = clone(obj)
    stack = [obj]
    while stack:
        t = stack.pop()
        if t in stop_on:
            continue
        t._copy_internals(clone=clone)
        for c in t.get_children(**opts):
            stack.append(c)
    return obj
开发者ID:BwRy,项目名称:rcs-db-ext,代码行数:26,代码来源:visitors.py


示例7: find_columns

def find_columns(clause):
    """locate Column objects within the given expression."""

    cols = util.column_set()
    def visit_column(col):
        cols.add(col)
    visitors.traverse(clause, {}, {'column':visit_column})
    return cols
开发者ID:greghaynes,项目名称:xsbs,代码行数:8,代码来源:util.py


示例8: test_reduce

 def test_reduce(self):
     meta = MetaData()
     t1 = Table('t1', meta,
         Column('t1id', Integer, primary_key=True),
         Column('t1data', String(30)))
     t2 = Table('t2', meta,
         Column('t2id', Integer, ForeignKey('t1.t1id'), primary_key=True),
         Column('t2data', String(30)))
     t3 = Table('t3', meta,
         Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
         Column('t3data', String(30)))
     
     
     eq_(
         util.column_set(sql_util.reduce_columns([t1.c.t1id, t1.c.t1data, t2.c.t2id, t2.c.t2data, t3.c.t3id, t3.c.t3data])),
         util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])
     )
开发者ID:clones,项目名称:sqlalchemy,代码行数:17,代码来源:test_selectable.py


示例9: visit_binary

 def visit_binary(binary):
     if binary.operator == operators.eq:
         cols = util.column_set(chain(*[c.proxy_set for c in columns.difference(omit)]))
         if binary.left in cols and binary.right in cols:
             for c in columns:
                 if c.shares_lineage(binary.right):
                     omit.add(c)
                     break
开发者ID:Kellel,项目名称:items,代码行数:8,代码来源:util.py


示例10: test_onclause_direction

    def test_onclause_direction(self):
        metadata = MetaData()

        employee = Table( 'Employee', metadata,
            Column('name', String(100)),
            Column('id', Integer, primary_key= True),
        )

        engineer = Table('Engineer', metadata, 
            Column('id', Integer,
                         ForeignKey('Employee.id'), primary_key=True))


        eq_(util.column_set(employee.join(engineer, employee.c.id
            == engineer.c.id).primary_key),
            util.column_set([employee.c.id]))
        eq_(util.column_set(employee.join(engineer, engineer.c.id
            == employee.c.id).primary_key),
            util.column_set([employee.c.id]))
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:19,代码来源:test_selectable.py


示例11: reduce_columns

def reduce_columns(columns, *clauses, **kw):
    """given a list of columns, return a 'reduced' set based on natural equivalents.

    the set is reduced to the smallest list of columns which have no natural
    equivalent present in the list.  A "natural equivalent" means that two columns
    will ultimately represent the same value because they are related by a foreign key.

    \*clauses is an optional list of join clauses which will be traversed
    to further identify columns that are "equivalent".

    \**kw may specify 'ignore_nonexistent_tables' to ignore foreign keys
    whose tables are not yet configured.

    This function is primarily used to determine the most minimal "primary key"
    from a selectable, by reducing the set of primary key columns present
    in the the selectable to just those that are not repeated.

    """
    ignore_nonexistent_tables = kw.pop("ignore_nonexistent_tables", False)

    columns = util.ordered_column_set(columns)

    omit = util.column_set()
    for col in columns:
        for fk in chain(*[c.foreign_keys for c in col.proxy_set]):
            for c in columns:
                if c is col:
                    continue
                try:
                    fk_col = fk.column
                except exc.NoReferencedTableError:
                    if ignore_nonexistent_tables:
                        continue
                    else:
                        raise
                if fk_col.shares_lineage(c):
                    omit.add(col)
                    break

    if clauses:

        def visit_binary(binary):
            if binary.operator == operators.eq:
                cols = util.column_set(chain(*[c.proxy_set for c in columns.difference(omit)]))
                if binary.left in cols and binary.right in cols:
                    for c in columns:
                        if c.shares_lineage(binary.right):
                            omit.add(c)
                            break

        for clause in clauses:
            visitors.traverse(clause, {}, {"binary": visit_binary})

    return expression.ColumnSet(columns.difference(omit))
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:54,代码来源:util.py


示例12: test_reduce_aliased_union

    def test_reduce_aliased_union(self):
        metadata = MetaData()
        item_table = Table(
            'item', metadata,
            Column('id', Integer, ForeignKey('base_item.id'), primary_key=True),
            Column('dummy', Integer, default=0))

        base_item_table = Table(
            'base_item', metadata,
            Column('id', Integer, primary_key=True),
            Column('child_name', String(255), default=None))
        
        from sqlalchemy.orm.util import polymorphic_union
        
        item_join = polymorphic_union( {
            'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'),
            'Item':base_item_table.join(item_table),
            }, None, 'item_join')
            
        eq_(
            util.column_set(sql_util.reduce_columns([item_join.c.id, item_join.c.dummy, item_join.c.child_name])),
            util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])
        )    
开发者ID:clones,项目名称:sqlalchemy,代码行数:23,代码来源:test_selectable.py


示例13: unwrap_order_by

def unwrap_order_by(clause):
    """Break up an 'order by' expression into individual column-expressions,
    without DESC/ASC/NULLS FIRST/NULLS LAST"""

    cols = util.column_set()
    stack = deque([clause])
    while stack:
        t = stack.popleft()
        if isinstance(t, expression.ColumnElement) and (
            not isinstance(t, expression._UnaryExpression) or not operators.is_ordering_modifier(t.modifier)
        ):
            cols.add(t)
        else:
            for c in t.get_children():
                stack.append(c)
    return cols
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:16,代码来源:util.py


示例14: cloned_traverse

def cloned_traverse(obj, opts, visitors):
    """clone the given expression structure, allowing 
    modifications by visitors."""

    cloned = util.column_dict()
    stop_on = util.column_set(opts.get('stop_on', []))

    def clone(elem):
        if elem in stop_on:
            return elem
        else:
            if elem not in cloned:
                cloned[elem] = newelem = elem._clone()
                newelem._copy_internals(clone=clone)
                meth = visitors.get(newelem.__visit_name__, None)
                if meth:
                    meth(newelem)
            return cloned[elem]

    if obj is not None:
        obj = clone(obj)
    return obj
开发者ID:EchelonFour,项目名称:CouchPotatoServer,代码行数:22,代码来源:visitors.py


示例15: _determine_targets

    def _determine_targets(self):
        if isinstance(self.argument, type):
            self.mapper = mapper.class_mapper(self.argument, compile=False)
        elif isinstance(self.argument, mapper.Mapper):
            self.mapper = self.argument
        elif util.callable(self.argument):
            # accept a callable to suit various deferred-configurational schemes
            self.mapper = mapper.class_mapper(self.argument(), compile=False)
        else:
            raise sa_exc.ArgumentError(
                "relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument))
            )
        assert isinstance(self.mapper, mapper.Mapper), self.mapper

        # accept callables for other attributes which may require deferred initialization
        for attr in ("order_by", "primaryjoin", "secondaryjoin", "secondary", "_foreign_keys", "remote_side"):
            if util.callable(getattr(self, attr)):
                setattr(self, attr, getattr(self, attr)())

        # in the case that InstrumentedAttributes were used to construct
        # primaryjoin or secondaryjoin, remove the "_orm_adapt" annotation so these
        # interact with Query in the same way as the original Table-bound Column objects
        for attr in ("primaryjoin", "secondaryjoin"):
            val = getattr(self, attr)
            if val is not None:
                util.assert_arg_type(val, sql.ClauseElement, attr)
                setattr(self, attr, _orm_deannotate(val))

        if self.order_by:
            self.order_by = [expression._literal_as_column(x) for x in util.to_list(self.order_by)]

        self._foreign_keys = util.column_set(
            expression._literal_as_column(x) for x in util.to_column_set(self._foreign_keys)
        )
        self.remote_side = util.column_set(
            expression._literal_as_column(x) for x in util.to_column_set(self.remote_side)
        )

        if not self.parent.concrete:
            for inheriting in self.parent.iterate_to_root():
                if inheriting is not self.parent and inheriting._get_property(self.key, raiseerr=False):
                    util.warn(
                        (
                            "Warning: relation '%s' on mapper '%s' supercedes "
                            "the same relation on inherited mapper '%s'; this "
                            "can cause dependency issues during flush"
                        )
                        % (self.key, self.parent, inheriting)
                    )

        # TODO: remove 'self.table'
        self.target = self.table = self.mapper.mapped_table

        if self.cascade.delete_orphan:
            if self.parent.class_ is self.mapper.class_:
                raise sa_exc.ArgumentError(
                    "In relationship '%s', can't establish 'delete-orphan' cascade "
                    "rule on a self-referential relationship.  "
                    "You probably want cascade='all', which includes delete cascading but not orphan detection."
                    % (str(self))
                )
            self.mapper.primary_mapper().delete_orphans.append((self.key, self.parent.class_))
开发者ID:AntonNguyen,项目名称:easy_api,代码行数:62,代码来源:properties.py


示例16: find_columns

def find_columns(clause):
    """locate Column objects within the given expression."""

    cols = util.column_set()
    visitors.traverse(clause, {}, {"column": cols.add})
    return cols
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:6,代码来源:util.py


示例17: _determine_synchronize_pairs

    def _determine_synchronize_pairs(self):

        if self.local_remote_pairs:
            if not self._foreign_keys:
                raise sa_exc.ArgumentError("foreign_keys argument is required with _local_remote_pairs argument")

            self.synchronize_pairs = []

            for l, r in self.local_remote_pairs:
                if r in self._foreign_keys:
                    self.synchronize_pairs.append((l, r))
                elif l in self._foreign_keys:
                    self.synchronize_pairs.append((r, l))
        else:
            eq_pairs = criterion_as_pairs(
                self.primaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly
            )
            eq_pairs = [
                (l, r)
                for l, r in eq_pairs
                if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r))
                or self.viewonly
                and r in self._foreign_keys
            ]

            if not eq_pairs:
                if not self.viewonly and criterion_as_pairs(
                    self.primaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True
                ):
                    raise sa_exc.ArgumentError(
                        "Could not locate any equated, locally "
                        "mapped column pairs for primaryjoin condition '%s' on relation %s. "
                        "For more relaxed rules on join conditions, the relation may be "
                        "marked as viewonly=True." % (self.primaryjoin, self)
                    )
                else:
                    if self._foreign_keys:
                        raise sa_exc.ArgumentError(
                            "Could not determine relation direction for "
                            "primaryjoin condition '%s', on relation %s. "
                            "Are the columns in 'foreign_keys' present within the given "
                            "join condition ?" % (self.primaryjoin, self)
                        )
                    else:
                        raise sa_exc.ArgumentError(
                            "Could not determine relation direction for "
                            "primaryjoin condition '%s', on relation %s. "
                            "Specify the 'foreign_keys' argument to indicate which columns "
                            "on the relation are foreign." % (self.primaryjoin, self)
                        )

            self.synchronize_pairs = eq_pairs

        if self.secondaryjoin:
            sq_pairs = criterion_as_pairs(
                self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly
            )
            sq_pairs = [
                (l, r)
                for l, r in sq_pairs
                if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r)) or r in self._foreign_keys
            ]

            if not sq_pairs:
                if not self.viewonly and criterion_as_pairs(
                    self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True
                ):
                    raise sa_exc.ArgumentError(
                        "Could not locate any equated, locally mapped "
                        "column pairs for secondaryjoin condition '%s' on relation %s. "
                        "For more relaxed rules on join conditions, the "
                        "relation may be marked as viewonly=True." % (self.secondaryjoin, self)
                    )
                else:
                    raise sa_exc.ArgumentError(
                        "Could not determine relation direction "
                        "for secondaryjoin condition '%s', on relation %s. "
                        "Specify the foreign_keys argument to indicate which "
                        "columns on the relation are foreign." % (self.secondaryjoin, self)
                    )

            self.secondary_synchronize_pairs = sq_pairs
        else:
            self.secondary_synchronize_pairs = None

        self._foreign_keys = util.column_set(r for l, r in self.synchronize_pairs)
        if self.secondary_synchronize_pairs:
            self._foreign_keys.update(r for l, r in self.secondary_synchronize_pairs)
开发者ID:AntonNguyen,项目名称:easy_api,代码行数:88,代码来源:properties.py



注:本文中的sqlalchemy.util.column_set函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.defaultdict函数代码示例发布时间:2022-05-27
下一篇:
Python util.column_dict函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap