• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.column函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.column函数的典型用法代码示例。如果您正苦于以下问题:Python column函数的具体用法?Python column怎么用?Python column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_annotate_fromlist_preservation

    def test_annotate_fromlist_preservation(self):
        """test the FROM list in select still works
        even when multiple annotate runs have created
        copies of the same selectable

        #2453, continued

        """
        table1 = table('table1', column('x'))
        table2 = table('table2', column('y'))
        a1 = table1.alias()
        s = select([a1.c.x]).select_from(
                a1.join(table2, a1.c.x==table2.c.y)
            )

        assert_s = select([select([s])])
        for fn in (
            sql_util._deep_deannotate,
            lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
            lambda s:visitors.cloned_traverse(s, {}, {}),
            lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
        ):

            sel = fn(select([fn(select([fn(s)]))]))
            eq_(str(assert_s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py


示例2: _insert_operation_operation_form

def _insert_operation_operation_form():
    tb = table(
        'operation_operation_form',
        column('operation_id', Integer),
        column('operation_form_id', Integer))

    columns = ('operation_id', 'operation_form_id')
    data = [
        (3022, 39),#normalize
		(3022, 40),
		(3022, 41),
		(3022, 43),
		(3022, 110),
		(3022, 3022),

		(3012, 39),#feature-indexer'
		(3012, 40),
		(3012, 41),
		(3012, 43),
		(3012, 110),
		(3012,3012),

    ]

    rows = [dict(zip(columns, row)) for row in data]
    op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:26,代码来源:bca9192ljsj4_moving_normalizer.py


示例3: _insert_operation_operation_form

def _insert_operation_operation_form():
    tb = table(
        'operation_operation_form',
        column('operation_id', Integer),
        column('operation_form_id', Integer))

    columns = [c.name for c in tb.columns]
    data = [
        [REGRESSION_MODEL, 102],
        [ISOTONIC_REGRESSION, 103],
        [AFT_SURVIVAL_REGRESSION, 104],
        [GBT_REGRESSOR, 105],
        [RANDOM_FOREST_REGRESSOR, 106],
        [GENERALIZED_LINEAR_REGRESSOR, 107],

        [REGRESSION_MODEL, 41],
        [ISOTONIC_REGRESSION, 41],
        [AFT_SURVIVAL_REGRESSION, 41],
        [GBT_REGRESSOR, 41],
        [RANDOM_FOREST_REGRESSOR, 41],
        [GENERALIZED_LINEAR_REGRESSOR, 41],
    ]
    rows = [dict(zip(columns, row)) for row in data]

    op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:25,代码来源:70078039a87a_adding_regression_operations_metadata.py


示例4: _insert_operation_form_translation

def _insert_operation_form_translation():
    tb = table(
        'operation_form_translation',
        column('id', Integer),
        column('locale', String),
        column('name', String))

    columns = ('id', 'locale', 'name')
    data = [
		(3024, 'en', 'Execution'),
		(3024, 'pt', 'Execução'),
        (3025, 'en', 'Execution'),
		(3025, 'pt', 'Execução'),
        (3026, 'en', 'Execution'),
		(3026, 'pt', 'Execução'),
        (3027, 'en', 'Execution'),
		(3027, 'pt', 'Execução'),
        (3028, 'en', 'Execution'),
		(3028, 'pt', 'Execução'),
        (3029, 'en', 'Execution'),
		(3029, 'pt', 'Execução'),
        (3030, 'en', 'Execution'),
		(3030, 'pt', 'Execução'),
        (3031, 'en', 'Execution'),
		(3031, 'pt', 'Execução'),
    ]
    rows = [dict(zip(columns, row)) for row in data]

    op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:29,代码来源:abc0603ljsj2_adding_hdfs_feature_compss.py


示例5: test_limit_offset_with_correlated_order_by

    def test_limit_offset_with_correlated_order_by(self):
        t1 = table('t1', column('x', Integer), column('y', Integer))
        t2 = table('t2', column('x', Integer), column('y', Integer))

        order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar()
        s = select([t1]).where(t1.c.x == 5).order_by(order_by) \
            .limit(10).offset(20)

        self.assert_compile(
            s,
            "SELECT anon_1.x, anon_1.y "
            "FROM (SELECT t1.x AS x, t1.y AS y, "
            "ROW_NUMBER() OVER (ORDER BY "
            "(SELECT t2.y FROM t2 WHERE t1.x = t2.x)"
            ") AS mssql_rn "
            "FROM t1 "
            "WHERE t1.x = :x_1) AS anon_1 "
            "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
            checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5}
        )

        c = s.compile(dialect=mssql.MSDialect())
        eq_(len(c._result_columns), 2)
        assert t1.c.x in set(c._create_result_map()['x'][1])
        assert t1.c.y in set(c._create_result_map()['y'][1])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:25,代码来源:test_compiler.py


示例6: test_annotations

    def test_annotations(self):
        """test that annotated clause constructs use the 
        decorated class' compiler.

        """

        t1 = table('t1', column('c1'), column('c2'))

        dispatch = Select._compiler_dispatch
        try:
            @compiles(Select)
            def compile(element, compiler, **kw):
                return "OVERRIDE"

            s1 = select([t1])
            self.assert_compile(
                s1, "OVERRIDE"
            )
            self.assert_compile(
                s1._annotate({}),
                "OVERRIDE"
            )
        finally:
            Select._compiler_dispatch = dispatch
            if hasattr(Select, '_compiler_dispatcher'):
                del Select._compiler_dispatcher
开发者ID:onetera,项目名称:scandatatransfer,代码行数:26,代码来源:test_compiler.py


示例7: test_cube_operators

    def test_cube_operators(self):

        t = table('t', column('value'),
                  column('x'), column('y'), column('z'), column('q'))

        stmt = select([func.sum(t.c.value)])

        self.assert_compile(
            stmt.group_by(func.cube(t.c.x, t.c.y)),
            "SELECT sum(t.value) AS sum_1 FROM t GROUP BY CUBE(t.x, t.y)"
        )

        self.assert_compile(
            stmt.group_by(func.rollup(t.c.x, t.c.y)),
            "SELECT sum(t.value) AS sum_1 FROM t GROUP BY ROLLUP(t.x, t.y)"
        )

        self.assert_compile(
            stmt.group_by(
                func.grouping_sets(t.c.x, t.c.y)
            ),
            "SELECT sum(t.value) AS sum_1 FROM t "
            "GROUP BY GROUPING SETS(t.x, t.y)"
        )

        self.assert_compile(
            stmt.group_by(
                func.grouping_sets(
                    sql.tuple_(t.c.x, t.c.y),
                    sql.tuple_(t.c.z, t.c.q),
                )
            ),
            "SELECT sum(t.value) AS sum_1 FROM t GROUP BY "
            "GROUPING SETS((t.x, t.y), (t.z, t.q))"
        )
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:35,代码来源:test_functions.py


示例8: test_noorderby_parameters_insubquery

    def test_noorderby_parameters_insubquery(self):
        """test that the ms-sql dialect does not include ORDER BY
        positional parameters in subqueries"""

        table1 = table(
            "mytable",
            column("myid", Integer),
            column("name", String),
            column("description", String),
        )

        q = select(
            [table1.c.myid, sql.literal('bar').label('c1')],
            order_by=[table1.c.name + '-']
        ).alias("foo")
        crit = q.c.myid == table1.c.myid
        dialect = mssql.dialect()
        dialect.paramstyle = "qmark"
        dialect.positional = True
        self.assert_compile(
            select(["*"], crit),
            "SELECT * FROM (SELECT mytable.myid AS "
            "myid, ? AS c1 FROM mytable) AS foo, mytable WHERE "
            "foo.myid = mytable.myid",
            dialect=dialect,
            checkparams={'param_1': 'bar'},
            # if name_1 is included, too many parameters are passed to dbapi
            checkpositional=('bar', )
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_compiler.py


示例9: test_insert_returning

 def test_insert_returning(self):
     table1 = table(
         "mytable",
         column("myid", Integer),
         column("name", String(128)),
         column("description", String(128)),
     )
     i = insert(table1, values=dict(name="foo")).returning(
         table1.c.myid, table1.c.name
     )
     self.assert_compile(
         i,
         "INSERT INTO mytable (name) OUTPUT "
         "inserted.myid, inserted.name VALUES "
         "(:name)",
     )
     i = insert(table1, values=dict(name="foo")).returning(table1)
     self.assert_compile(
         i,
         "INSERT INTO mytable (name) OUTPUT "
         "inserted.myid, inserted.name, "
         "inserted.description VALUES (:name)",
     )
     i = insert(table1, values=dict(name="foo")).returning(
         func.length(table1.c.name)
     )
     self.assert_compile(
         i,
         "INSERT INTO mytable (name) OUTPUT "
         "LEN(inserted.name) AS length_1 VALUES "
         "(:name)",
     )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py


示例10: upgrade

def upgrade():
    ### commands auto generated by Alembic  ###
    op.create_table('employees_attendance',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('attendanceDate', sa.Date(), nullable=False),
    sa.Column('arriveTime', sa.Time(), nullable=False),
    sa.Column('leaveTime', sa.Time(), nullable=False),
    sa.Column('employee_id', sa.Integer(), nullable=False),
    sa.Column('createdBy_id', sa.Integer(), nullable=True),
    sa.Column('issueDateTime', sa.DateTime(), nullable=True),
    sa.ForeignKeyConstraint(['createdBy_id'], ['users.id'], ondelete='CASCADE'),
    sa.ForeignKeyConstraint(['employee_id'], ['users.id'], ondelete='CASCADE'),
    sa.PrimaryKeyConstraint('id')
    )



    abilities_table = table('abilities',
    column('id', Integer),
    column('name', String),
    )

    op.bulk_insert(abilities_table,
        [
            {'name': "employeeAttendances.list"},
            {'name': "employeeAttendances.show"},
            {'name': "employeeAttendances.delete"},
            {'name': "employeeAttendances.update"},
            {'name': "employeeAttendances.create"},

            {'name': "feedbacks.list"},
            {'name': "feedbacks.show"}

        ]
    )
开发者ID:yehiaa,项目名称:clinic,代码行数:35,代码来源:634da8ab3d5c_employees_attendance_and_new_abilities.py


示例11: test_delete_extra_froms

 def test_delete_extra_froms(self):
     t1 = table("t1", column("c1"))
     t2 = table("t2", column("c1"))
     q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
     self.assert_compile(
         q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1"
     )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_compiler.py


示例12: genericize_thread

def genericize_thread():
    class Thread_(Base):
        __table__ = Base.metadata.tables['thread']

    # Get data from columns-to-be-dropped
    with session_scope() as db_session:
        results = db_session.query(Thread_.id, Thread_.g_thrid).all()

    to_insert = [dict(id=r[0], g_thrid=r[1]) for r in results]

    # Add new columns
    op.add_column('thread', sa.Column('type', sa.String(16)))

    # Create new table, insert data
    # The table
    op.create_table('imapthread',
                    sa.Column('g_thrid', sa.BigInteger(), nullable=True,
                              index=True),
                    sa.Column('id', sa.Integer()),
                    sa.ForeignKeyConstraint(['id'], ['thread.id'],
                                            ondelete='CASCADE'),
                    sa.PrimaryKeyConstraint('id'))

    # The ad-hoc table for insert
    table_ = table('imapthread',
                   column('g_thrid', sa.BigInteger),
                   column('id', sa.Integer))
    if to_insert:
        op.bulk_insert(table_, to_insert)

    # Drop columns now
    op.drop_column('thread', 'g_thrid')
开发者ID:caitp,项目名称:inbox,代码行数:32,代码来源:007_per_provider_table_split.py


示例13: genericize_imapaccount

def genericize_imapaccount():
    class ImapAccount_(Base):
        __table__ = Base.metadata.tables['imapaccount']

    # Get data from columns-to-be-dropped
    with session_scope() as db_session:
        results = db_session.query(ImapAccount_.id,
                                   ImapAccount_.imap_host).all()

    to_insert = [dict(id=r[0], imap_host=r[1]) for r in results]

    # Rename table, add new columns.
    op.rename_table('imapaccount', 'account')
    op.add_column('account', sa.Column('type', sa.String(16)))

    # Create new table, insert data
    # The table
    op.create_table('imapaccount',
                    sa.Column('imap_host', sa.String(512)),
                    sa.Column('id', sa.Integer()),
                    sa.ForeignKeyConstraint(['id'], ['account.id'],
                                            ondelete='CASCADE'),
                    sa.PrimaryKeyConstraint('id'))

    # The ad-hoc table for insert
    table_ = table('imapaccount',
                   column('imap_host', sa.String()),
                   column('id', sa.Integer))
    if to_insert:
        op.bulk_insert(table_, to_insert)

    # Drop columns now
    op.drop_column('account', 'imap_host')
开发者ID:caitp,项目名称:inbox,代码行数:33,代码来源:007_per_provider_table_split.py


示例14: downgrade_imapthread

def downgrade_imapthread():
    class ImapThread_(Base):
        __table__ = Base.metadata.tables['imapthread']

    # Get data from table-to-be-dropped
    with session_scope() as db_session:
        results = db_session.query(ImapThread_.id, ImapThread_.g_thrid).all()
    to_insert = [dict(id=r[0], g_thrid=r[1]) for r in results]

    # Drop columns, add new columns + insert data
    op.drop_column('thread', 'type')
    op.add_column('thread', sa.Column('g_thrid', sa.BigInteger(),
                                      nullable=True, index=True))
    table_ = table('thread',
                   column('g_thrid', sa.BigInteger),
                   column('id', sa.Integer))

    for r in to_insert:
        op.execute(
            table_.update().
            where(table_.c.id == r['id']).
            values({'g_thrid': r['g_thrid']})
        )

    # Drop table
    op.drop_table('imapthread')
开发者ID:caitp,项目名称:inbox,代码行数:26,代码来源:007_per_provider_table_split.py


示例15: _by_search_tsearch

    def _by_search_tsearch(self, query, operand, maybe_negate):
        # type: (Query, str, ConditionTransform) -> Query
        tsquery = func.plainto_tsquery(literal("zulip.english_us_search"), literal(operand))
        ts_locs_array = func.ts_match_locs_array
        query = query.column(ts_locs_array(literal("zulip.english_us_search"),
                                           column("rendered_content"),
                                           tsquery).label("content_matches"))
        # We HTML-escape the subject in Postgres to avoid doing a server round-trip
        query = query.column(ts_locs_array(literal("zulip.english_us_search"),
                                           func.escape_html(column("subject")),
                                           tsquery).label("subject_matches"))

        # Do quoted string matching.  We really want phrase
        # search here so we can ignore punctuation and do
        # stemming, but there isn't a standard phrase search
        # mechanism in Postgres
        for term in re.findall('"[^"]+"|\S+', operand):
            if term[0] == '"' and term[-1] == '"':
                term = term[1:-1]
                term = '%' + connection.ops.prep_for_like_query(term) + '%'
                cond = or_(column("content").ilike(term),
                           column("subject").ilike(term))
                query = query.where(maybe_negate(cond))

        cond = column("search_tsvector").op("@@")(tsquery)
        return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:26,代码来源:messages.py


示例16: upgrade

def upgrade():
    userexternalid = table('userexternalid',
        column('updated_at', sa.DateTime),
        column('last_used_at', sa.DateTime))
    op.add_column('userexternalid', sa.Column('last_used_at', sa.DateTime(), nullable=True))
    op.execute(userexternalid.update().values(last_used_at=userexternalid.c.updated_at))
    op.alter_column('userexternalid', 'last_used_at', nullable=False)
开发者ID:hasgeek,项目名称:lastuser,代码行数:7,代码来源:f324b0ecd05c_external_id_last_used_at.py


示例17: get

    def get(self, database, query):
        bs = block_size.c.block_size
        stmt = powa_getstatdata_detailed_db()
        stmt = stmt.where(
            (column("datname") == bindparam("database")) &
            (column("queryid") == bindparam("query")))
        stmt = stmt.alias()
        from_clause = outerjoin(powa_statements, stmt,
                           and_(powa_statements.c.queryid == stmt.c.queryid, powa_statements.c.dbid == stmt.c.dbid))
        c = stmt.c
        rblk = mulblock(sum(c.shared_blks_read).label("shared_blks_read"))
        wblk = mulblock(sum(c.shared_blks_hit).label("shared_blks_hit"))
        stmt = (select([
            column("query"),
            sum(c.calls).label("calls"),
            sum(c.runtime).label("runtime"),
            rblk,
            wblk,
            (rblk + wblk).label("total_blks")])
            .select_from(from_clause)
            .where(powa_statements.c.queryid == bindparam("query"))
            .group_by(column("query"), bs))

        value = self.execute(stmt, params={
            "query": query,
            "database": database,
            "from": self.get_argument("from"),
            "to": self.get_argument("to")
        })
        if value.rowcount < 1:
            self.render("xhr.html", content="No data")
            return
        self.render("database/query/detail.html", stats=value.first())
开发者ID:champeric,项目名称:powa-web,代码行数:33,代码来源:query.py


示例18: test_recursive

    def test_recursive(self):
        parts = table('parts',
            column('part'),
            column('sub_part'),
            column('quantity'),
        )

        included_parts = select([
                            parts.c.sub_part,
                            parts.c.part,
                            parts.c.quantity]).\
                            where(parts.c.part=='our part').\
                                cte(recursive=True)

        incl_alias = included_parts.alias()
        parts_alias = parts.alias()
        included_parts = included_parts.union(
            select([
                parts_alias.c.part,
                parts_alias.c.sub_part,
                parts_alias.c.quantity]).\
                where(parts_alias.c.part==incl_alias.c.sub_part)
            )

        s = select([
            included_parts.c.sub_part,
            func.sum(included_parts.c.quantity).label('total_quantity')]).\
            select_from(included_parts.join(
                    parts,included_parts.c.part==parts.c.part)).\
            group_by(included_parts.c.sub_part)
        self.assert_compile(s,
                "WITH RECURSIVE anon_1(sub_part, part, quantity) "
                "AS (SELECT parts.sub_part AS sub_part, parts.part "
                "AS part, parts.quantity AS quantity FROM parts "
                "WHERE parts.part = :part_1 UNION SELECT parts_1.part "
                "AS part, parts_1.sub_part AS sub_part, parts_1.quantity "
                "AS quantity FROM parts AS parts_1, anon_1 AS anon_2 "
                "WHERE parts_1.part = anon_2.sub_part) "
                "SELECT anon_1.sub_part, "
                "sum(anon_1.quantity) AS total_quantity FROM anon_1 "
                "JOIN parts ON anon_1.part = parts.part "
                "GROUP BY anon_1.sub_part"
            )

        # quick check that the "WITH RECURSIVE" varies per
        # dialect
        self.assert_compile(s,
                "WITH anon_1(sub_part, part, quantity) "
                "AS (SELECT parts.sub_part AS sub_part, parts.part "
                "AS part, parts.quantity AS quantity FROM parts "
                "WHERE parts.part = :part_1 UNION SELECT parts_1.part "
                "AS part, parts_1.sub_part AS sub_part, parts_1.quantity "
                "AS quantity FROM parts AS parts_1, anon_1 AS anon_2 "
                "WHERE parts_1.part = anon_2.sub_part) "
                "SELECT anon_1.sub_part, "
                "sum(anon_1.quantity) AS total_quantity FROM anon_1 "
                "JOIN parts ON anon_1.part = parts.part "
                "GROUP BY anon_1.sub_part",
                dialect=mssql.dialect()
            )
开发者ID:biner,项目名称:sqlalchemy,代码行数:60,代码来源:test_cte.py


示例19: upgrade

def upgrade():
    op.execute('SET search_path TO mineturer')
    conn = op.get_bind()
    res = conn.execute('select userid, password from users')
    results = res.fetchall()

    users = table(
        'users',
        column('bcrypt_pwd', sa.String),
        column('userid', sa.Integer)
    )

    for result in results:
        userid = result[0]
        pwd = result[1]
        bcrypt_pwd = bcrypt.hashpw(
            pwd.encode('utf-8'),
            bcrypt.gensalt()
        )
        op.execute(
            users.update().where(
                users.c.userid == op.inline_literal(userid)
            ).values({
                'bcrypt_pwd': op.inline_literal(bcrypt_pwd)}
            )
        )

    op.execute('SET search_path TO public')
开发者ID:atlefren,项目名称:mineturer2,代码行数:28,代码来源:4f7733ee071_fix_passwords.py


示例20: test_update_returning

 def test_update_returning(self):
     dialect = postgresql.dialect()
     table1 = table(
         'mytable',
         column(
             'myid', Integer),
         column(
             'name', String(128)),
         column(
             'description', String(128)))
     u = update(
         table1,
         values=dict(
             name='foo')).returning(
         table1.c.myid,
         table1.c.name)
     self.assert_compile(u,
                         'UPDATE mytable SET name=%(name)s '
                         'RETURNING mytable.myid, mytable.name',
                         dialect=dialect)
     u = update(table1, values=dict(name='foo')).returning(table1)
     self.assert_compile(u,
                         'UPDATE mytable SET name=%(name)s '
                         'RETURNING mytable.myid, mytable.name, '
                         'mytable.description', dialect=dialect)
     u = update(table1, values=dict(name='foo'
                                    )).returning(func.length(table1.c.name))
     self.assert_compile(
         u,
         'UPDATE mytable SET name=%(name)s '
         'RETURNING length(mytable.name) AS length_1',
         dialect=dialect)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py



注:本文中的sqlalchemy.sql.column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.delete函数代码示例发布时间:2022-05-27
下一篇:
Python sql.cast函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap