本文整理汇总了Python中sqlalchemy.sql.table函数的典型用法代码示例。如果您正苦于以下问题:Python table函数的具体用法?Python table怎么用?Python table使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了table函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: initialize_a10_slb_v1
def initialize_a10_slb_v1(conn, provider, a10):
"""Create a10_slb_v1 for existing vips"""
a10_slb_v1 = table(
'a10_slb_v1',
column('id'),
column('vip_id'))
select_vips = text(
"SELECT vips.id, pools.tenant_id "
"FROM vips, pools, providerresourceassociations p "
"WHERE vips.pool_id = pools.id "
"AND pools.id = p.resource_id "
"AND p.provider_name = :provider "
"AND vips.id NOT IN (SELECT vip_id FROM a10_slb_v1)")
select_vips = select_vips.bindparams(provider=provider)
vips = list(map(dict, conn.execute(select_vips).fetchall()))
tenant_ids = [v['tenant_id'] for v in vips]
tenant_appliance_lookup = initialize_a10_tenant_appliance(conn, a10, tenant_ids)
a10_slb = table(
'a10_slb',
column('id'),
column('type'),
column('a10_appliance_id'))
for vip in vips:
id = str(uuid.uuid4())
appliance = tenant_appliance_lookup[vip['tenant_id']]
insert_slb = a10_slb.insert().\
values(id=id, type=a10_slb_v1.name, a10_appliance_id=appliance)
conn.execute(insert_slb)
insert_vip = a10_slb_v1.insert().\
values(id=id, vip_id=vip['id'])
conn.execute(insert_vip)
开发者ID:mmdurrant,项目名称:a10-neutron-lbaas,代码行数:34,代码来源:step.py
示例2: test_union
def test_union(self):
t1 = table(
't1', column('col1'), column('col2'),
column('col3'), column('col4'))
t2 = table(
't2', column('col1'), column('col2'),
column('col3'), column('col4'))
s1, s2 = select(
[t1.c.col3.label('col3'), t1.c.col4.label('col4')],
t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(['t2col2r2', 't2col2r3']))
u = union(s1, s2, order_by=['col3', 'col4'])
self.assert_compile(u,
'SELECT t1.col3 AS col3, t1.col4 AS col4 '
'FROM t1 WHERE t1.col2 IN (:col2_1, '
':col2_2) UNION SELECT t2.col3 AS col3, '
't2.col4 AS col4 FROM t2 WHERE t2.col2 IN '
'(:col2_3, :col2_4) ORDER BY col3, col4')
self.assert_compile(u.alias('bar').select(),
'SELECT bar.col3, bar.col4 FROM (SELECT '
't1.col3 AS col3, t1.col4 AS col4 FROM t1 '
'WHERE t1.col2 IN (:col2_1, :col2_2) UNION '
'SELECT t2.col3 AS col3, t2.col4 AS col4 '
'FROM t2 WHERE t2.col2 IN (:col2_3, '
':col2_4)) AS bar')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_compiler.py
示例3: upgrade
def upgrade():
conn = op.get_bind()
op.add_column('external_identities', sa.Column('local_user_id',
sa.Integer()))
external_identities_t = table('external_identities',
sa.Column('local_user_name', sa.Unicode(50)),
sa.Column('local_user_id', sa.Integer))
users_t = table('users',
sa.Column('user_name', sa.Unicode(50)),
sa.Column('id', sa.Integer))
stmt = external_identities_t.update().values(local_user_id=users_t.c.id). \
where(users_t.c.user_name == external_identities_t.c.local_user_name)
conn.execute(stmt)
op.drop_constraint('pk_external_identities', 'external_identities',
type='primary')
op.drop_constraint('fk_external_identities_local_user_name_users',
'external_identities', type='foreignkey')
op.drop_column('external_identities', 'local_user_name')
op.create_primary_key('pk_external_identities', 'external_identities',
cols=['external_id', 'local_user_id',
'provider_name'])
op.create_foreign_key(None, 'external_identities', 'users',
remote_cols=['id'],
local_cols=['local_user_id'], onupdate='CASCADE',
ondelete='CASCADE')
开发者ID:RaHus,项目名称:ziggurat_foundations,代码行数:27,代码来源:57bbf0c387c_add_local_user_id.py
示例4: upgrade
def upgrade():
op.add_column('attributes',
sa.Column('node', sa.Integer, default=0))
op.add_column('attributes',
sa.Column('is_latest', sa.Boolean, default=True))
n = table('nodes',
column('node', sa.Integer),
column('latest_version', sa.Integer))
v = table('versions',
column('node', sa.Integer),
column('serial', sa.Integer))
a = table('attributes',
column('serial', sa.Integer),
column('node', sa.Integer),
column('is_latest', sa.Boolean))
s = sa.select([v.c.node]).where(v.c.serial == a.c.serial)
u = a.update().values({'node': s})
op.execute(u)
s = sa.select([v.c.serial == n.c.latest_version],
and_(a.c.node == n.c.node, a.c.serial == v.c.serial))
u = a.update().values({'is_latest': s})
op.execute(u)
op.alter_column('attributes', 'node', nullable=False)
op.alter_column('attributes', 'is_latest', nullable=False)
op.create_index('idx_attributes_serial_node', 'attributes',
['serial', 'node'])
开发者ID:AthinaB,项目名称:synnefo,代码行数:31,代码来源:3b62b3f1bf6c_add_attributes_node_.py
示例5: test_join_with_hint
def test_join_with_hint(self):
t1 = table("t1", column("a", Integer), column("b", String), column("c", String))
t2 = table("t2", column("a", Integer), column("b", Integer), column("c", Integer))
join = t1.join(t2, t1.c.a == t2.c.a).select().with_hint(t1, "WITH (NOLOCK)")
self.assert_compile(
join, "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c " "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a"
)
开发者ID:hunterfu,项目名称:LuoYunCloud,代码行数:7,代码来源:test_compiler.py
示例6: test_annotate_fromlist_preservation
def test_annotate_fromlist_preservation(self):
"""test the FROM list in select still works
even when multiple annotate runs have created
copies of the same selectable
#2453, continued
"""
table1 = table('table1', column('x'))
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
a1.join(table2, a1.c.x==table2.c.y)
)
assert_s = select([select([s])])
for fn in (
sql_util._deep_deannotate,
lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
lambda s:visitors.cloned_traverse(s, {}, {}),
lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
):
sel = fn(select([fn(select([fn(s)]))]))
eq_(str(assert_s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py
示例7: test_alias_outer_join
def test_alias_outer_join(self):
address_types = table("address_types", column("id"), column("name"))
addresses = table(
"addresses",
column("id"),
column("user_id"),
column("address_type_id"),
column("email_address"),
)
at_alias = address_types.alias()
s = (
select([at_alias, addresses])
.select_from(
addresses.outerjoin(
at_alias, addresses.c.address_type_id == at_alias.c.id
)
)
.where(addresses.c.user_id == 7)
.order_by(addresses.c.id, address_types.c.id)
)
self.assert_compile(
s,
"SELECT address_types_1.id, "
"address_types_1.name, addresses.id, "
"addresses.user_id, addresses.address_type_"
"id, addresses.email_address FROM "
"addresses LEFT OUTER JOIN address_types "
"address_types_1 ON addresses.address_type_"
"id = address_types_1.id WHERE "
"addresses.user_id = :user_id_1 ORDER BY "
"addresses.id, address_types.id",
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py
示例8: _insert_operation_port_interface
def _insert_operation_port_interface():
tb = table(
'operation_port_interface',
column('id', Integer),
column('color', String))
columns = [c.name for c in tb.columns]
data = [
(20, '#ED254E'),
]
rows = [dict(zip(columns, cat)) for cat in data]
op.bulk_insert(tb, rows)
tb = table(
'operation_port_interface_translation',
column('id', Integer),
column('locale', String),
column('name', String))
columns = [c.name for c in tb.columns]
data = [
(20, 'en', 'TransformationModel'),
(20, 'pt', 'TransformationModel'),
]
rows = [dict(zip(columns, cat)) for cat in data]
op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:25,代码来源:bd294c13637b_feature_extraction_ops.py
示例9: upgrade
def upgrade():
op.add_column('lu_population_number',
sa.Column('name_ro', sa.UnicodeText, nullable=True))
op.add_column('lu_population_units_restricted',
sa.Column('name_ro', sa.UnicodeText, nullable=True))
lu_pop_codes = table('lu_population_number',
column('code', sa.String),
column('name_ro', sa.UnicodeText))
lu_pop_restrict_codes = table('lu_population_units_restricted',
column('code', sa.String),
column('name_ro', sa.UnicodeText))
for code, name_ro in DATA:
op.execute(
lu_pop_codes.update()
.where(lu_pop_codes.c.code == op.inline_literal(code))
.values({'name_ro': op.inline_literal(name_ro)}))
op.execute(
lu_pop_restrict_codes.update()
.where(lu_pop_restrict_codes.c.code == op.inline_literal(code))
.values({'name_ro': op.inline_literal(name_ro)}))
开发者ID:eaudeweb,项目名称:art17-consultation,代码行数:26,代码来源:236f754feafc_population_ro.py
示例10: test_nonansi_nested_right_join
def test_nonansi_nested_right_join(self):
a = table("a", column("a"))
b = table("b", column("b"))
c = table("c", column("c"))
j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
self.assert_compile(
select([j]),
"SELECT a.a, b.b, c.c FROM a, b, c "
"WHERE a.a = b.b AND b.b = c.c",
dialect=oracle.OracleDialect(use_ansi=False),
)
j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
self.assert_compile(
select([j]),
"SELECT a.a, b.b, c.c FROM a, b, c "
"WHERE a.a = b.b(+) AND b.b = c.c",
dialect=oracle.OracleDialect(use_ansi=False),
)
j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b)
self.assert_compile(
select([j]),
"SELECT a.a, b.b, c.c FROM a, b, c "
"WHERE a.a = b.b AND b.b = c.c(+)",
dialect=oracle.OracleDialect(use_ansi=False),
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:31,代码来源:test_compiler.py
示例11: upgrade
def upgrade():
connection = op.get_bind()
plan_table = table(
'plan',
sa.Column('id', sa.GUID(), nullable=False),
sa.Column('project_id', sa.GUID(), nullable=False),
)
project_plan_table = table(
'project_plan',
sa.Column('plan_id', sa.GUID(), nullable=False),
sa.Column('project_id', sa.GUID(), nullable=False),
)
for project_plan in connection.execute(project_plan_table.select()):
print("Migrating ProjectPlan plan_id=%s project_id=%s" % (
project_plan.plan_id, project_plan.project_id))
connection.execute(
plan_table.update().where(
plan_table.c.id == project_plan.plan_id,
).values({
plan_table.c.project_id: project_plan.project_id,
})
)
开发者ID:DRP-Guy,项目名称:changes,代码行数:26,代码来源:4cd8cf6a0894_migrate_projectplan.py
示例12: upgrade
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.add_column('archived_services', sa.Column('status', sa.String(), nullable=True))
op.add_column('services', sa.Column('status', sa.String(), nullable=True))
op.create_check_constraint(
"ck_services_status",
"services",
"status in ('disabled', 'enabled', 'published')"
)
op.create_check_constraint(
"ck_archived_services_status",
"archived_services",
"status in ('disabled', 'enabled', 'published')"
)
services = table('services', column('status', String))
archived_services = table('archived_services', column('status', String))
op.execute(
services.update(). \
values({'status': op.inline_literal('enabled')})
)
op.execute(
archived_services.update(). \
values({'status': op.inline_literal('enabled')})
)
开发者ID:RichardKnop,项目名称:digitalmarketplace-api,代码行数:30,代码来源:597e346723ee_.py
示例13: upgrade
def upgrade():
ip_policy = table('quark_ip_policy',
column('id', sa.String(length=36)),
column('size', INET()))
ip_policy_cidrs = table('quark_ip_policy_cidrs',
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
connection = op.get_bind()
# 1. Retrieve all ip_policy_cidr rows.
results = connection.execute(
select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr])
).fetchall()
# 2. Determine IPSet for each IP Policy.
ipp = dict()
for ip_policy_id, cidr in results:
if ip_policy_id not in ipp:
ipp[ip_policy_id] = netaddr.IPSet()
ipp[ip_policy_id].add(cidr)
# 3. Populate size for each IP Policy.
for ip_policy_id in ipp:
connection.execute(ip_policy.update().values(
size=ipp[ip_policy_id].size).where(
ip_policy.c.id == ip_policy_id))
开发者ID:Anonymike,项目名称:quark,代码行数:26,代码来源:28e55acaf366_populate_ip_policy_size.py
示例14: upgrade
def upgrade():
op.create_table('question_match',
sa.Column('id', postgresql.UUID(), nullable=False),
sa.Column('data', sa.Text(), nullable=True),
sa.Column('score', sa.Float(), nullable=True),
sa.ForeignKeyConstraint(['id'], ['question.id']),
sa.PrimaryKeyConstraint('id')
)
question = sql.table('question',
sql.column('id'),
sql.column('match_data'),
sql.column('match_score'))
question_match = sql.table('question_match',
sql.column('id'),
sql.column('data'),
sql.column('score'))
op.execute(
question_match.insert().from_select(
['id', 'data', 'score'],
question.select().where(question.c.match_data != None)))
op.drop_column('question', 'match_data')
op.drop_column('question', 'match_score')
开发者ID:alexandrupetrescu94,项目名称:mptracker,代码行数:25,代码来源:4bc50a6cca9_question_match_table.py
示例15: test_pg_example_one
def test_pg_example_one(self):
products = table("products", column("id"), column("date"))
products_log = table("products_log", column("id"), column("date"))
moved_rows = (
products.delete()
.where(
and_(products.c.date >= "dateone", products.c.date < "datetwo")
)
.returning(*products.c)
.cte("moved_rows")
)
stmt = products_log.insert().from_select(
products_log.c, moved_rows.select()
)
self.assert_compile(
stmt,
"WITH moved_rows AS "
"(DELETE FROM products WHERE products.date >= :date_1 "
"AND products.date < :date_2 "
"RETURNING products.id, products.date) "
"INSERT INTO products_log (id, date) "
"SELECT moved_rows.id, moved_rows.date FROM moved_rows",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:25,代码来源:test_cte.py
示例16: test_union
def test_union(self):
t1 = table("t1", column("col1"), column("col2"), column("col3"), column("col4"))
t2 = table("t2", column("col1"), column("col2"), column("col3"), column("col4"))
s1, s2 = (
select([t1.c.col3.label("col3"), t1.c.col4.label("col4")], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label("col3"), t2.c.col4.label("col4")], t2.c.col2.in_(["t2col2r2", "t2col2r3"])),
)
u = union(s1, s2, order_by=["col3", "col4"])
self.assert_compile(
u,
"SELECT t1.col3 AS col3, t1.col4 AS col4 "
"FROM t1 WHERE t1.col2 IN (:col2_1, "
":col2_2) UNION SELECT t2.col3 AS col3, "
"t2.col4 AS col4 FROM t2 WHERE t2.col2 IN "
"(:col2_3, :col2_4) ORDER BY col3, col4",
)
self.assert_compile(
u.alias("bar").select(),
"SELECT bar.col3, bar.col4 FROM (SELECT "
"t1.col3 AS col3, t1.col4 AS col4 FROM t1 "
"WHERE t1.col2 IN (:col2_1, :col2_2) UNION "
"SELECT t2.col3 AS col3, t2.col4 AS col4 "
"FROM t2 WHERE t2.col2 IN (:col2_3, "
":col2_4)) AS bar",
)
开发者ID:hunterfu,项目名称:LuoYunCloud,代码行数:25,代码来源:test_compiler.py
示例17: test_cte_refers_to_aliased_cte_twice
def test_cte_refers_to_aliased_cte_twice(self):
# test issue #4204
a = table("a", column("id"))
b = table("b", column("id"), column("fid"))
c = table("c", column("id"), column("fid"))
cte1 = select([a.c.id]).cte(name="cte1")
aa = cte1.alias("aa")
cte2 = (
select([b.c.id])
.select_from(b.join(aa, b.c.fid == aa.c.id))
.cte(name="cte2")
)
cte3 = (
select([c.c.id])
.select_from(c.join(aa, c.c.fid == aa.c.id))
.cte(name="cte3")
)
stmt = select([cte3.c.id, cte2.c.id]).select_from(
cte2.join(cte3, cte2.c.id == cte3.c.id)
)
self.assert_compile(
stmt,
"WITH cte1 AS (SELECT a.id AS id FROM a), "
"cte2 AS (SELECT b.id AS id FROM b "
"JOIN cte1 AS aa ON b.fid = aa.id), "
"cte3 AS (SELECT c.id AS id FROM c "
"JOIN cte1 AS aa ON c.fid = aa.id) "
"SELECT cte3.id, cte2.id FROM cte2 JOIN cte3 ON cte2.id = cte3.id",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:34,代码来源:test_cte.py
示例18: test_annotate_unique_traversal
def test_annotate_unique_traversal(self):
"""test that items are copied only once during
annotate, deannotate traversal
#2453
"""
table1 = table('table1', column('x'))
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
a1.join(table2, a1.c.x==table2.c.y)
)
for sel in (
sql_util._deep_deannotate(s),
sql_util._deep_annotate(s, {'foo':'bar'}),
visitors.cloned_traverse(s, {}, {}),
visitors.replacement_traverse(s, {}, lambda x:None)
):
# the columns clause isn't changed at all
assert sel._raw_columns[0].table is a1
# the from objects are internally consistent,
# i.e. the Alias at position 0 is the same
# Alias in the Join object in position 1
assert sel._froms[0] is sel._froms[1].left
eq_(str(s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:26,代码来源:test_selectable.py
示例19: downgrade
def downgrade():
op.add_column('queries', sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON), nullable=False, server_default=json.dumps({})))
queries = table(
'queries',
sa.Column('schedule', MutableDict.as_mutable(PseudoJSON)),
sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))
op.execute(
queries
.update()
.values({'old_schedule': queries.c.schedule}))
op.drop_column('queries', 'schedule')
op.add_column('queries', sa.Column('schedule', sa.String(length=10), nullable=True))
queries = table(
'queries',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('schedule', sa.String(length=10)),
sa.Column('old_schedule', MutableDict.as_mutable(PseudoJSON)))
conn = op.get_bind()
for query in conn.execute(queries.select()):
scheduleValue = query.old_schedule['interval']
if scheduleValue <= 86400:
scheduleValue = query.old_schedule['time']
conn.execute(
queries
.update()
.where(queries.c.id == query.id)
.values(schedule=scheduleValue))
op.drop_column('queries', 'old_schedule')
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:640888ce445d_.py
示例20: test_delete_extra_froms
def test_delete_extra_froms(self):
t1 = table("t1", column("c1"))
t2 = table("t2", column("c1"))
q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
self.assert_compile(
q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1"
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_compiler.py
注:本文中的sqlalchemy.sql.table函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论