• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.table函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.table函数的典型用法代码示例。如果您正苦于以下问题:Python table函数的具体用法?Python table怎么用?Python table使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了table函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: initialize_a10_slb_v1

def initialize_a10_slb_v1(conn, provider, a10):
    """Create a10_slb_v1 for existing vips"""

    a10_slb_v1 = table(
        'a10_slb_v1',
        column('id'),
        column('vip_id'))
    select_vips = text(
        "SELECT vips.id, pools.tenant_id "
        "FROM vips, pools, providerresourceassociations p "
        "WHERE vips.pool_id = pools.id "
        "AND pools.id = p.resource_id "
        "AND p.provider_name = :provider "
        "AND vips.id NOT IN (SELECT vip_id FROM a10_slb_v1)")
    select_vips = select_vips.bindparams(provider=provider)
    vips = list(map(dict, conn.execute(select_vips).fetchall()))

    tenant_ids = [v['tenant_id'] for v in vips]
    tenant_appliance_lookup = initialize_a10_tenant_appliance(conn, a10, tenant_ids)

    a10_slb = table(
        'a10_slb',
        column('id'),
        column('type'),
        column('a10_appliance_id'))
    for vip in vips:
        id = str(uuid.uuid4())
        appliance = tenant_appliance_lookup[vip['tenant_id']]
        insert_slb = a10_slb.insert().\
            values(id=id, type=a10_slb_v1.name, a10_appliance_id=appliance)
        conn.execute(insert_slb)
        insert_vip = a10_slb_v1.insert().\
            values(id=id, vip_id=vip['id'])
        conn.execute(insert_vip)
开发者ID:mmdurrant,项目名称:a10-neutron-lbaas,代码行数:34,代码来源:step.py


示例2: test_union

 def test_union(self):
     t1 = table(
         't1', column('col1'), column('col2'),
         column('col3'), column('col4'))
     t2 = table(
         't2', column('col1'), column('col2'),
         column('col3'), column('col4'))
     s1, s2 = select(
         [t1.c.col3.label('col3'), t1.c.col4.label('col4')],
         t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \
         select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
                t2.c.col2.in_(['t2col2r2', 't2col2r3']))
     u = union(s1, s2, order_by=['col3', 'col4'])
     self.assert_compile(u,
                         'SELECT t1.col3 AS col3, t1.col4 AS col4 '
                         'FROM t1 WHERE t1.col2 IN (:col2_1, '
                         ':col2_2) UNION SELECT t2.col3 AS col3, '
                         't2.col4 AS col4 FROM t2 WHERE t2.col2 IN '
                         '(:col2_3, :col2_4) ORDER BY col3, col4')
     self.assert_compile(u.alias('bar').select(),
                         'SELECT bar.col3, bar.col4 FROM (SELECT '
                         't1.col3 AS col3, t1.col4 AS col4 FROM t1 '
                         'WHERE t1.col2 IN (:col2_1, :col2_2) UNION '
                         'SELECT t2.col3 AS col3, t2.col4 AS col4 '
                         'FROM t2 WHERE t2.col2 IN (:col2_3, '
                         ':col2_4)) AS bar')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_compiler.py


示例3: upgrade

def upgrade():
    conn = op.get_bind()
    op.add_column('external_identities', sa.Column('local_user_id',
                                                   sa.Integer()))

    external_identities_t = table('external_identities',
                                  sa.Column('local_user_name', sa.Unicode(50)),
                                  sa.Column('local_user_id', sa.Integer))
    users_t = table('users',
                    sa.Column('user_name', sa.Unicode(50)),
                    sa.Column('id', sa.Integer))

    stmt = external_identities_t.update().values(local_user_id=users_t.c.id). \
        where(users_t.c.user_name == external_identities_t.c.local_user_name)
    conn.execute(stmt)
    op.drop_constraint('pk_external_identities', 'external_identities',
                       type='primary')
    op.drop_constraint('fk_external_identities_local_user_name_users',
                       'external_identities', type='foreignkey')
    op.drop_column('external_identities', 'local_user_name')
    op.create_primary_key('pk_external_identities', 'external_identities',
                          cols=['external_id', 'local_user_id',
                                'provider_name'])
    op.create_foreign_key(None, 'external_identities', 'users',
                          remote_cols=['id'],
                          local_cols=['local_user_id'], onupdate='CASCADE',
                          ondelete='CASCADE')
开发者ID:RaHus,项目名称:ziggurat_foundations,代码行数:27,代码来源:57bbf0c387c_add_local_user_id.py


示例4: upgrade

def upgrade():
    op.add_column('attributes',
                  sa.Column('node', sa.Integer, default=0))
    op.add_column('attributes',
                  sa.Column('is_latest', sa.Boolean, default=True))

    n = table('nodes',
              column('node', sa.Integer),
              column('latest_version', sa.Integer))
    v = table('versions',
              column('node', sa.Integer),
              column('serial', sa.Integer))
    a = table('attributes',
              column('serial', sa.Integer),
              column('node', sa.Integer),
              column('is_latest', sa.Boolean))

    s = sa.select([v.c.node]).where(v.c.serial == a.c.serial)
    u = a.update().values({'node': s})
    op.execute(u)

    s = sa.select([v.c.serial == n.c.latest_version],
                  and_(a.c.node == n.c.node, a.c.serial == v.c.serial))
    u = a.update().values({'is_latest': s})
    op.execute(u)

    op.alter_column('attributes', 'node', nullable=False)
    op.alter_column('attributes', 'is_latest', nullable=False)

    op.create_index('idx_attributes_serial_node', 'attributes',
                    ['serial', 'node'])
开发者ID:AthinaB,项目名称:synnefo,代码行数:31,代码来源:3b62b3f1bf6c_add_attributes_node_.py


示例5: test_join_with_hint

 def test_join_with_hint(self):
     t1 = table("t1", column("a", Integer), column("b", String), column("c", String))
     t2 = table("t2", column("a", Integer), column("b", Integer), column("c", Integer))
     join = t1.join(t2, t1.c.a == t2.c.a).select().with_hint(t1, "WITH (NOLOCK)")
     self.assert_compile(
         join, "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c " "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a"
     )
开发者ID:hunterfu,项目名称:LuoYunCloud,代码行数:7,代码来源:test_compiler.py


示例6: test_annotate_fromlist_preservation

    def test_annotate_fromlist_preservation(self):
        """test the FROM list in select still works
        even when multiple annotate runs have created
        copies of the same selectable

        #2453, continued

        """
        table1 = table('table1', column('x'))
        table2 = table('table2', column('y'))
        a1 = table1.alias()
        s = select([a1.c.x]).select_from(
                a1.join(table2, a1.c.x==table2.c.y)
            )

        assert_s = select([select([s])])
        for fn in (
            sql_util._deep_deannotate,
            lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
            lambda s:visitors.cloned_traverse(s, {}, {}),
            lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
        ):

            sel = fn(select([fn(select([fn(s)]))]))
            eq_(str(assert_s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py


示例7: test_alias_outer_join

 def test_alias_outer_join(self):
     address_types = table("address_types", column("id"), column("name"))
     addresses = table(
         "addresses",
         column("id"),
         column("user_id"),
         column("address_type_id"),
         column("email_address"),
     )
     at_alias = address_types.alias()
     s = (
         select([at_alias, addresses])
         .select_from(
             addresses.outerjoin(
                 at_alias, addresses.c.address_type_id == at_alias.c.id
             )
         )
         .where(addresses.c.user_id == 7)
         .order_by(addresses.c.id, address_types.c.id)
     )
     self.assert_compile(
         s,
         "SELECT address_types_1.id, "
         "address_types_1.name, addresses.id, "
         "addresses.user_id, addresses.address_type_"
         "id, addresses.email_address FROM "
         "addresses LEFT OUTER JOIN address_types "
         "address_types_1 ON addresses.address_type_"
         "id = address_types_1.id WHERE "
         "addresses.user_id = :user_id_1 ORDER BY "
         "addresses.id, address_types.id",
     )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py


示例8: _insert_operation_port_interface

def _insert_operation_port_interface():
    tb = table(
        'operation_port_interface',
        column('id', Integer),
        column('color', String))
    columns = [c.name for c in tb.columns]
    data = [
        (20, '#ED254E'),
    ]
    rows = [dict(zip(columns, cat)) for cat in data]
    op.bulk_insert(tb, rows)

    tb = table(
        'operation_port_interface_translation',
        column('id', Integer),
        column('locale', String),
        column('name', String))

    columns = [c.name for c in tb.columns]
    data = [
        (20, 'en', 'TransformationModel'),
        (20, 'pt', 'TransformationModel'),
    ]
    rows = [dict(zip(columns, cat)) for cat in data]
    op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:25,代码来源:bd294c13637b_feature_extraction_ops.py


示例9: upgrade

def upgrade():
    op.add_column('lu_population_number',
        sa.Column('name_ro', sa.UnicodeText, nullable=True))

    op.add_column('lu_population_units_restricted',
        sa.Column('name_ro', sa.UnicodeText, nullable=True))

    lu_pop_codes = table('lu_population_number',
        column('code', sa.String),
        column('name_ro', sa.UnicodeText))

    lu_pop_restrict_codes = table('lu_population_units_restricted',
        column('code', sa.String),
        column('name_ro', sa.UnicodeText))

    for code, name_ro in DATA:

        op.execute(
            lu_pop_codes.update()
                .where(lu_pop_codes.c.code == op.inline_literal(code))
                .values({'name_ro': op.inline_literal(name_ro)}))

        op.execute(
            lu_pop_restrict_codes.update()
                .where(lu_pop_restrict_codes.c.code == op.inline_literal(code))
                .values({'name_ro': op.inline_literal(name_ro)}))
开发者ID:eaudeweb,项目名称:art17-consultation,代码行数:26,代码来源:236f754feafc_population_ro.py


示例10: test_nonansi_nested_right_join

    def test_nonansi_nested_right_join(self):
        a = table("a", column("a"))
        b = table("b", column("b"))
        c = table("c", column("c"))

        j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)

        self.assert_compile(
            select([j]),
            "SELECT a.a, b.b, c.c FROM a, b, c "
            "WHERE a.a = b.b AND b.b = c.c",
            dialect=oracle.OracleDialect(use_ansi=False),
        )

        j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)

        self.assert_compile(
            select([j]),
            "SELECT a.a, b.b, c.c FROM a, b, c "
            "WHERE a.a = b.b(+) AND b.b = c.c",
            dialect=oracle.OracleDialect(use_ansi=False),
        )

        j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b)

        self.assert_compile(
            select([j]),
            "SELECT a.a, b.b, c.c FROM a, b, c "
            "WHERE a.a = b.b AND b.b = c.c(+)",
            dialect=oracle.OracleDialect(use_ansi=False),
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:31,代码来源:test_compiler.py


示例11: upgrade

def upgrade():
    connection = op.get_bind()

    plan_table = table(
        'plan',
        sa.Column('id', sa.GUID(), nullable=False),
        sa.Column('project_id', sa.GUID(), nullable=False),
    )

    project_plan_table = table(
        'project_plan',
        sa.Column('plan_id', sa.GUID(), nullable=False),
        sa.Column('project_id', sa.GUID(), nullable=False),
    )

    for project_plan in connection.execute(project_plan_table.select()):
        print("Migrating ProjectPlan plan_id=%s project_id=%s" % (
            project_plan.plan_id, project_plan.project_id))

        connection.execute(
            plan_table.update().where(
                plan_table.c.id == project_plan.plan_id,
            ).values({
                plan_table.c.project_id: project_plan.project_id,
            })
        )
开发者ID:DRP-Guy,项目名称:changes,代码行数:26,代码来源:4cd8cf6a0894_migrate_projectplan.py


示例12: upgrade

def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    op.add_column('archived_services', sa.Column('status', sa.String(), nullable=True))
    op.add_column('services', sa.Column('status', sa.String(), nullable=True))

    op.create_check_constraint(
        "ck_services_status",
        "services",
        "status in ('disabled', 'enabled', 'published')"
    )

    op.create_check_constraint(
        "ck_archived_services_status",
        "archived_services",
        "status in ('disabled', 'enabled', 'published')"
    )

    services = table('services', column('status', String))

    archived_services = table('archived_services', column('status', String))

    op.execute(
        services.update(). \
        values({'status': op.inline_literal('enabled')})
    )

    op.execute(
        archived_services.update(). \
        values({'status': op.inline_literal('enabled')})
    )
开发者ID:RichardKnop,项目名称:digitalmarketplace-api,代码行数:30,代码来源:597e346723ee_.py


示例13: upgrade

def upgrade():
    ip_policy = table('quark_ip_policy',
                      column('id', sa.String(length=36)),
                      column('size', INET()))
    ip_policy_cidrs = table('quark_ip_policy_cidrs',
                            column('ip_policy_id', sa.String(length=36)),
                            column('cidr', sa.String(length=64)))
    connection = op.get_bind()

    # 1. Retrieve all ip_policy_cidr rows.
    results = connection.execute(
        select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr])
    ).fetchall()

    # 2. Determine IPSet for each IP Policy.
    ipp = dict()
    for ip_policy_id, cidr in results:
        if ip_policy_id not in ipp:
            ipp[ip_policy_id] = netaddr.IPSet()
        ipp[ip_policy_id].add(cidr)

    # 3. Populate size for each IP Policy.
    for ip_policy_id in ipp:
        connection.execute(ip_policy.update().values(
            size=ipp[ip_policy_id].size).where(
                ip_policy.c.id == ip_policy_id))
开发者ID:Anonymike,项目名称:quark,代码行数:26,代码来源:28e55acaf366_populate_ip_policy_size.py


示例14: upgrade

def upgrade():
    op.create_table('question_match',
        sa.Column('id', postgresql.UUID(), nullable=False),
        sa.Column('data', sa.Text(), nullable=True),
        sa.Column('score', sa.Float(), nullable=True),
        sa.ForeignKeyConstraint(['id'], ['question.id']),
        sa.PrimaryKeyConstraint('id')
    )

    question = sql.table('question',
        sql.column('id'),
        sql.column('match_data'),
        sql.column('match_score'))
    question_match = sql.table('question_match',
        sql.column('id'),
        sql.column('data'),
        sql.column('score'))

    op.execute(
        question_match.insert().from_select(
            ['id', 'data', 'score'],
            question.select().where(question.c.match_data != None)))

    op.drop_column('question', 'match_data')
    op.drop_column('question', 'match_score')
开发者ID:alexandrupetrescu94,项目名称:mptracker,代码行数:25,代码来源:4bc50a6cca9_question_match_table.py


示例15: test_pg_example_one

    def test_pg_example_one(self):
        products = table("products", column("id"), column("date"))
        products_log = table("products_log", column("id"), column("date"))

        moved_rows = (
            products.delete()
            .where(
                and_(products.c.date >= "dateone", products.c.date < "datetwo")
            )
            .returning(*products.c)
            .cte("moved_rows")
        )

        stmt = products_log.insert().from_select(
            products_log.c, moved_rows.select()
        )
        self.assert_compile(
            stmt,
            "WITH moved_rows AS "
            "(DELETE FROM products WHERE products.date >= :date_1 "
            "AND products.date < :date_2 "
            "RETURNING products.id, products.date) "
            "INSERT INTO products_log (id, date) "
            "SELECT moved_rows.id, moved_rows.date FROM moved_rows",
        )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:25,代码来源:test_cte.py


示例16: test_union

 def test_union(self):
     t1 = table("t1", column("col1"), column("col2"), column("col3"), column("col4"))
     t2 = table("t2", column("col1"), column("col2"), column("col3"), column("col4"))
     s1, s2 = (
         select([t1.c.col3.label("col3"), t1.c.col4.label("col4")], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
         select([t2.c.col3.label("col3"), t2.c.col4.label("col4")], t2.c.col2.in_(["t2col2r2", "t2col2r3"])),
     )
     u = union(s1, s2, order_by=["col3", "col4"])
     self.assert_compile(
         u,
         "SELECT t1.col3 AS col3, t1.col4 AS col4 "
         "FROM t1 WHERE t1.col2 IN (:col2_1, "
         ":col2_2) UNION SELECT t2.col3 AS col3, "
         "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN "
         "(:col2_3, :col2_4) ORDER BY col3, col4",
     )
     self.assert_compile(
         u.alias("bar").select(),
         "SELECT bar.col3, bar.col4 FROM (SELECT "
         "t1.col3 AS col3, t1.col4 AS col4 FROM t1 "
         "WHERE t1.col2 IN (:col2_1, :col2_2) UNION "
         "SELECT t2.col3 AS col3, t2.col4 AS col4 "
         "FROM t2 WHERE t2.col2 IN (:col2_3, "
         ":col2_4)) AS bar",
     )
开发者ID:hunterfu,项目名称:LuoYunCloud,代码行数:25,代码来源:test_compiler.py


示例17: test_cte_refers_to_aliased_cte_twice

    def test_cte_refers_to_aliased_cte_twice(self):
        # test issue #4204
        a = table("a", column("id"))
        b = table("b", column("id"), column("fid"))
        c = table("c", column("id"), column("fid"))

        cte1 = select([a.c.id]).cte(name="cte1")

        aa = cte1.alias("aa")

        cte2 = (
            select([b.c.id])
            .select_from(b.join(aa, b.c.fid == aa.c.id))
            .cte(name="cte2")
        )

        cte3 = (
            select([c.c.id])
            .select_from(c.join(aa, c.c.fid == aa.c.id))
            .cte(name="cte3")
        )

        stmt = select([cte3.c.id, cte2.c.id]).select_from(
            cte2.join(cte3, cte2.c.id == cte3.c.id)
        )
        self.assert_compile(
            stmt,
            "WITH cte1 AS (SELECT a.id AS id FROM a), "
            "cte2 AS (SELECT b.id AS id FROM b "
            "JOIN cte1 AS aa ON b.fid = aa.id), "
            "cte3 AS (SELECT c.id AS id FROM c "
            "JOIN cte1 AS aa ON c.fid = aa.id) "
            "SELECT cte3.id, cte2.id FROM cte2 JOIN cte3 ON cte2.id = cte3.id",
        )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:34,代码来源:test_cte.py


示例18: test_annotate_unique_traversal

    def test_annotate_unique_traversal(self):
        """test that items are copied only once during
        annotate, deannotate traversal

        #2453
        """
        table1 = table('table1', column('x'))
        table2 = table('table2', column('y'))
        a1 = table1.alias()
        s = select([a1.c.x]).select_from(
                a1.join(table2, a1.c.x==table2.c.y)
            )

        for sel in (
            sql_util._deep_deannotate(s),
            sql_util._deep_annotate(s, {'foo':'bar'}),
            visitors.cloned_traverse(s, {}, {}),
            visitors.replacement_traverse(s, {}, lambda x:None)
        ):
            # the columns clause isn't changed at all
            assert sel._raw_columns[0].table is a1
            # the from objects are internally consistent,
            # i.e. the Alias at position 0 is the same
            # Alias in the Join object in position 1
            assert sel._froms[0] is sel._froms[1].left
            eq_(str(s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:26,代码来源:test_selectable.py


示例19: downgrade

def downgrade():
    op.add_column('queries', sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({})))

    queries = table(
        'queries',
        sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)),
        sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))

    op.execute(
        queries
            .update()
            .values({'old_schedule': queries.c.schedule}))

    op.drop_column('queries', 'schedule')
    op.add_column('queries', sa.Column('schedule', sa.String(length=10), nullable=True))

    queries = table(
        'queries',
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('schedule', sa.String(length=10)),
        sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))

    conn = op.get_bind()
    for query in conn.execute(queries.select()):
        scheduleValue = query.old_schedule['interval']
        if scheduleValue <= 86400:
            scheduleValue = query.old_schedule['time']

        conn.execute(
            queries
                .update()
                .where(queries.c.id == query.id)
                .values(schedule=scheduleValue))

    op.drop_column('queries', 'old_schedule')
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:640888ce445d_.py


示例20: test_delete_extra_froms

 def test_delete_extra_froms(self):
     t1 = table("t1", column("c1"))
     t2 = table("t2", column("c1"))
     q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
     self.assert_compile(
         q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1"
     )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_compiler.py



注:本文中的sqlalchemy.sql.table函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.text函数代码示例发布时间:2022-05-27
下一篇:
Python sql.sqltext函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap