本文整理汇总了Python中sqlalchemy.sql.column函数的典型用法代码示例。如果您正苦于以下问题:Python column函数的具体用法?Python column怎么用?Python column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_annotate_fromlist_preservation
def test_annotate_fromlist_preservation(self):
"""test the FROM list in select still works
even when multiple annotate runs have created
copies of the same selectable
#2453, continued
"""
table1 = table('table1', column('x'))
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
a1.join(table2, a1.c.x==table2.c.y)
)
assert_s = select([select([s])])
for fn in (
sql_util._deep_deannotate,
lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
lambda s:visitors.cloned_traverse(s, {}, {}),
lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
):
sel = fn(select([fn(select([fn(s)]))]))
eq_(str(assert_s), str(sel))
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:25,代码来源:test_selectable.py
示例2: _insert_operation_operation_form
def _insert_operation_operation_form():
tb = table(
'operation_operation_form',
column('operation_id', Integer),
column('operation_form_id', Integer))
columns = ('operation_id', 'operation_form_id')
data = [
(3022, 39),#normalize
(3022, 40),
(3022, 41),
(3022, 43),
(3022, 110),
(3022, 3022),
(3012, 39),#feature-indexer'
(3012, 40),
(3012, 41),
(3012, 43),
(3012, 110),
(3012,3012),
]
rows = [dict(zip(columns, row)) for row in data]
op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:26,代码来源:bca9192ljsj4_moving_normalizer.py
示例3: _insert_operation_operation_form
def _insert_operation_operation_form():
tb = table(
'operation_operation_form',
column('operation_id', Integer),
column('operation_form_id', Integer))
columns = [c.name for c in tb.columns]
data = [
[REGRESSION_MODEL, 102],
[ISOTONIC_REGRESSION, 103],
[AFT_SURVIVAL_REGRESSION, 104],
[GBT_REGRESSOR, 105],
[RANDOM_FOREST_REGRESSOR, 106],
[GENERALIZED_LINEAR_REGRESSOR, 107],
[REGRESSION_MODEL, 41],
[ISOTONIC_REGRESSION, 41],
[AFT_SURVIVAL_REGRESSION, 41],
[GBT_REGRESSOR, 41],
[RANDOM_FOREST_REGRESSOR, 41],
[GENERALIZED_LINEAR_REGRESSOR, 41],
]
rows = [dict(zip(columns, row)) for row in data]
op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:25,代码来源:70078039a87a_adding_regression_operations_metadata.py
示例4: _insert_operation_form_translation
def _insert_operation_form_translation():
tb = table(
'operation_form_translation',
column('id', Integer),
column('locale', String),
column('name', String))
columns = ('id', 'locale', 'name')
data = [
(3024, 'en', 'Execution'),
(3024, 'pt', 'Execução'),
(3025, 'en', 'Execution'),
(3025, 'pt', 'Execução'),
(3026, 'en', 'Execution'),
(3026, 'pt', 'Execução'),
(3027, 'en', 'Execution'),
(3027, 'pt', 'Execução'),
(3028, 'en', 'Execution'),
(3028, 'pt', 'Execução'),
(3029, 'en', 'Execution'),
(3029, 'pt', 'Execução'),
(3030, 'en', 'Execution'),
(3030, 'pt', 'Execução'),
(3031, 'en', 'Execution'),
(3031, 'pt', 'Execução'),
]
rows = [dict(zip(columns, row)) for row in data]
op.bulk_insert(tb, rows)
开发者ID:eubr-bigsea,项目名称:tahiti,代码行数:29,代码来源:abc0603ljsj2_adding_hdfs_feature_compss.py
示例5: test_limit_offset_with_correlated_order_by
def test_limit_offset_with_correlated_order_by(self):
t1 = table('t1', column('x', Integer), column('y', Integer))
t2 = table('t2', column('x', Integer), column('y', Integer))
order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar()
s = select([t1]).where(t1.c.x == 5).order_by(order_by) \
.limit(10).offset(20)
self.assert_compile(
s,
"SELECT anon_1.x, anon_1.y "
"FROM (SELECT t1.x AS x, t1.y AS y, "
"ROW_NUMBER() OVER (ORDER BY "
"(SELECT t2.y FROM t2 WHERE t1.x = t2.x)"
") AS mssql_rn "
"FROM t1 "
"WHERE t1.x = :x_1) AS anon_1 "
"WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5}
)
c = s.compile(dialect=mssql.MSDialect())
eq_(len(c._result_columns), 2)
assert t1.c.x in set(c._create_result_map()['x'][1])
assert t1.c.y in set(c._create_result_map()['y'][1])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:25,代码来源:test_compiler.py
示例6: test_annotations
def test_annotations(self):
"""test that annotated clause constructs use the
decorated class' compiler.
"""
t1 = table('t1', column('c1'), column('c2'))
dispatch = Select._compiler_dispatch
try:
@compiles(Select)
def compile(element, compiler, **kw):
return "OVERRIDE"
s1 = select([t1])
self.assert_compile(
s1, "OVERRIDE"
)
self.assert_compile(
s1._annotate({}),
"OVERRIDE"
)
finally:
Select._compiler_dispatch = dispatch
if hasattr(Select, '_compiler_dispatcher'):
del Select._compiler_dispatcher
开发者ID:onetera,项目名称:scandatatransfer,代码行数:26,代码来源:test_compiler.py
示例7: test_cube_operators
def test_cube_operators(self):
t = table('t', column('value'),
column('x'), column('y'), column('z'), column('q'))
stmt = select([func.sum(t.c.value)])
self.assert_compile(
stmt.group_by(func.cube(t.c.x, t.c.y)),
"SELECT sum(t.value) AS sum_1 FROM t GROUP BY CUBE(t.x, t.y)"
)
self.assert_compile(
stmt.group_by(func.rollup(t.c.x, t.c.y)),
"SELECT sum(t.value) AS sum_1 FROM t GROUP BY ROLLUP(t.x, t.y)"
)
self.assert_compile(
stmt.group_by(
func.grouping_sets(t.c.x, t.c.y)
),
"SELECT sum(t.value) AS sum_1 FROM t "
"GROUP BY GROUPING SETS(t.x, t.y)"
)
self.assert_compile(
stmt.group_by(
func.grouping_sets(
sql.tuple_(t.c.x, t.c.y),
sql.tuple_(t.c.z, t.c.q),
)
),
"SELECT sum(t.value) AS sum_1 FROM t GROUP BY "
"GROUPING SETS((t.x, t.y), (t.z, t.q))"
)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:35,代码来源:test_functions.py
示例8: test_noorderby_parameters_insubquery
def test_noorderby_parameters_insubquery(self):
"""test that the ms-sql dialect does not include ORDER BY
positional parameters in subqueries"""
table1 = table(
"mytable",
column("myid", Integer),
column("name", String),
column("description", String),
)
q = select(
[table1.c.myid, sql.literal('bar').label('c1')],
order_by=[table1.c.name + '-']
).alias("foo")
crit = q.c.myid == table1.c.myid
dialect = mssql.dialect()
dialect.paramstyle = "qmark"
dialect.positional = True
self.assert_compile(
select(["*"], crit),
"SELECT * FROM (SELECT mytable.myid AS "
"myid, ? AS c1 FROM mytable) AS foo, mytable WHERE "
"foo.myid = mytable.myid",
dialect=dialect,
checkparams={'param_1': 'bar'},
# if name_1 is included, too many parameters are passed to dbapi
checkpositional=('bar', )
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_compiler.py
示例9: test_insert_returning
def test_insert_returning(self):
table1 = table(
"mytable",
column("myid", Integer),
column("name", String(128)),
column("description", String(128)),
)
i = insert(table1, values=dict(name="foo")).returning(
table1.c.myid, table1.c.name
)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"inserted.myid, inserted.name VALUES "
"(:name)",
)
i = insert(table1, values=dict(name="foo")).returning(table1)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"inserted.myid, inserted.name, "
"inserted.description VALUES (:name)",
)
i = insert(table1, values=dict(name="foo")).returning(
func.length(table1.c.name)
)
self.assert_compile(
i,
"INSERT INTO mytable (name) OUTPUT "
"LEN(inserted.name) AS length_1 VALUES "
"(:name)",
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py
示例10: upgrade
def upgrade():
### commands auto generated by Alembic ###
op.create_table('employees_attendance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('attendanceDate', sa.Date(), nullable=False),
sa.Column('arriveTime', sa.Time(), nullable=False),
sa.Column('leaveTime', sa.Time(), nullable=False),
sa.Column('employee_id', sa.Integer(), nullable=False),
sa.Column('createdBy_id', sa.Integer(), nullable=True),
sa.Column('issueDateTime', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['createdBy_id'], ['users.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['employee_id'], ['users.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
abilities_table = table('abilities',
column('id', Integer),
column('name', String),
)
op.bulk_insert(abilities_table,
[
{'name': "employeeAttendances.list"},
{'name': "employeeAttendances.show"},
{'name': "employeeAttendances.delete"},
{'name': "employeeAttendances.update"},
{'name': "employeeAttendances.create"},
{'name': "feedbacks.list"},
{'name': "feedbacks.show"}
]
)
开发者ID:yehiaa,项目名称:clinic,代码行数:35,代码来源:634da8ab3d5c_employees_attendance_and_new_abilities.py
示例11: test_delete_extra_froms
def test_delete_extra_froms(self):
t1 = table("t1", column("c1"))
t2 = table("t2", column("c1"))
q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
self.assert_compile(
q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1"
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_compiler.py
示例12: genericize_thread
def genericize_thread():
class Thread_(Base):
__table__ = Base.metadata.tables['thread']
# Get data from columns-to-be-dropped
with session_scope() as db_session:
results = db_session.query(Thread_.id, Thread_.g_thrid).all()
to_insert = [dict(id=r[0], g_thrid=r[1]) for r in results]
# Add new columns
op.add_column('thread', sa.Column('type', sa.String(16)))
# Create new table, insert data
# The table
op.create_table('imapthread',
sa.Column('g_thrid', sa.BigInteger(), nullable=True,
index=True),
sa.Column('id', sa.Integer()),
sa.ForeignKeyConstraint(['id'], ['thread.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'))
# The ad-hoc table for insert
table_ = table('imapthread',
column('g_thrid', sa.BigInteger),
column('id', sa.Integer))
if to_insert:
op.bulk_insert(table_, to_insert)
# Drop columns now
op.drop_column('thread', 'g_thrid')
开发者ID:caitp,项目名称:inbox,代码行数:32,代码来源:007_per_provider_table_split.py
示例13: genericize_imapaccount
def genericize_imapaccount():
class ImapAccount_(Base):
__table__ = Base.metadata.tables['imapaccount']
# Get data from columns-to-be-dropped
with session_scope() as db_session:
results = db_session.query(ImapAccount_.id,
ImapAccount_.imap_host).all()
to_insert = [dict(id=r[0], imap_host=r[1]) for r in results]
# Rename table, add new columns.
op.rename_table('imapaccount', 'account')
op.add_column('account', sa.Column('type', sa.String(16)))
# Create new table, insert data
# The table
op.create_table('imapaccount',
sa.Column('imap_host', sa.String(512)),
sa.Column('id', sa.Integer()),
sa.ForeignKeyConstraint(['id'], ['account.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'))
# The ad-hoc table for insert
table_ = table('imapaccount',
column('imap_host', sa.String()),
column('id', sa.Integer))
if to_insert:
op.bulk_insert(table_, to_insert)
# Drop columns now
op.drop_column('account', 'imap_host')
开发者ID:caitp,项目名称:inbox,代码行数:33,代码来源:007_per_provider_table_split.py
示例14: downgrade_imapthread
def downgrade_imapthread():
class ImapThread_(Base):
__table__ = Base.metadata.tables['imapthread']
# Get data from table-to-be-dropped
with session_scope() as db_session:
results = db_session.query(ImapThread_.id, ImapThread_.g_thrid).all()
to_insert = [dict(id=r[0], g_thrid=r[1]) for r in results]
# Drop columns, add new columns + insert data
op.drop_column('thread', 'type')
op.add_column('thread', sa.Column('g_thrid', sa.BigInteger(),
nullable=True, index=True))
table_ = table('thread',
column('g_thrid', sa.BigInteger),
column('id', sa.Integer))
for r in to_insert:
op.execute(
table_.update().
where(table_.c.id == r['id']).
values({'g_thrid': r['g_thrid']})
)
# Drop table
op.drop_table('imapthread')
开发者ID:caitp,项目名称:inbox,代码行数:26,代码来源:007_per_provider_table_split.py
示例15: _by_search_tsearch
def _by_search_tsearch(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
tsquery = func.plainto_tsquery(literal("zulip.english_us_search"), literal(operand))
ts_locs_array = func.ts_match_locs_array
query = query.column(ts_locs_array(literal("zulip.english_us_search"),
column("rendered_content"),
tsquery).label("content_matches"))
# We HTML-escape the subject in Postgres to avoid doing a server round-trip
query = query.column(ts_locs_array(literal("zulip.english_us_search"),
func.escape_html(column("subject")),
tsquery).label("subject_matches"))
# Do quoted string matching. We really want phrase
# search here so we can ignore punctuation and do
# stemming, but there isn't a standard phrase search
# mechanism in Postgres
for term in re.findall('"[^"]+"|\S+', operand):
if term[0] == '"' and term[-1] == '"':
term = term[1:-1]
term = '%' + connection.ops.prep_for_like_query(term) + '%'
cond = or_(column("content").ilike(term),
column("subject").ilike(term))
query = query.where(maybe_negate(cond))
cond = column("search_tsvector").op("@@")(tsquery)
return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:26,代码来源:messages.py
示例16: upgrade
def upgrade():
userexternalid = table('userexternalid',
column('updated_at', sa.DateTime),
column('last_used_at', sa.DateTime))
op.add_column('userexternalid', sa.Column('last_used_at', sa.DateTime(), nullable=True))
op.execute(userexternalid.update().values(last_used_at=userexternalid.c.updated_at))
op.alter_column('userexternalid', 'last_used_at', nullable=False)
开发者ID:hasgeek,项目名称:lastuser,代码行数:7,代码来源:f324b0ecd05c_external_id_last_used_at.py
示例17: get
def get(self, database, query):
bs = block_size.c.block_size
stmt = powa_getstatdata_detailed_db()
stmt = stmt.where(
(column("datname") == bindparam("database")) &
(column("queryid") == bindparam("query")))
stmt = stmt.alias()
from_clause = outerjoin(powa_statements, stmt,
and_(powa_statements.c.queryid == stmt.c.queryid, powa_statements.c.dbid == stmt.c.dbid))
c = stmt.c
rblk = mulblock(sum(c.shared_blks_read).label("shared_blks_read"))
wblk = mulblock(sum(c.shared_blks_hit).label("shared_blks_hit"))
stmt = (select([
column("query"),
sum(c.calls).label("calls"),
sum(c.runtime).label("runtime"),
rblk,
wblk,
(rblk + wblk).label("total_blks")])
.select_from(from_clause)
.where(powa_statements.c.queryid == bindparam("query"))
.group_by(column("query"), bs))
value = self.execute(stmt, params={
"query": query,
"database": database,
"from": self.get_argument("from"),
"to": self.get_argument("to")
})
if value.rowcount < 1:
self.render("xhr.html", content="No data")
return
self.render("database/query/detail.html", stats=value.first())
开发者ID:champeric,项目名称:powa-web,代码行数:33,代码来源:query.py
示例18: test_recursive
def test_recursive(self):
parts = table('parts',
column('part'),
column('sub_part'),
column('quantity'),
)
included_parts = select([
parts.c.sub_part,
parts.c.part,
parts.c.quantity]).\
where(parts.c.part=='our part').\
cte(recursive=True)
incl_alias = included_parts.alias()
parts_alias = parts.alias()
included_parts = included_parts.union(
select([
parts_alias.c.part,
parts_alias.c.sub_part,
parts_alias.c.quantity]).\
where(parts_alias.c.part==incl_alias.c.sub_part)
)
s = select([
included_parts.c.sub_part,
func.sum(included_parts.c.quantity).label('total_quantity')]).\
select_from(included_parts.join(
parts,included_parts.c.part==parts.c.part)).\
group_by(included_parts.c.sub_part)
self.assert_compile(s,
"WITH RECURSIVE anon_1(sub_part, part, quantity) "
"AS (SELECT parts.sub_part AS sub_part, parts.part "
"AS part, parts.quantity AS quantity FROM parts "
"WHERE parts.part = :part_1 UNION SELECT parts_1.part "
"AS part, parts_1.sub_part AS sub_part, parts_1.quantity "
"AS quantity FROM parts AS parts_1, anon_1 AS anon_2 "
"WHERE parts_1.part = anon_2.sub_part) "
"SELECT anon_1.sub_part, "
"sum(anon_1.quantity) AS total_quantity FROM anon_1 "
"JOIN parts ON anon_1.part = parts.part "
"GROUP BY anon_1.sub_part"
)
# quick check that the "WITH RECURSIVE" varies per
# dialect
self.assert_compile(s,
"WITH anon_1(sub_part, part, quantity) "
"AS (SELECT parts.sub_part AS sub_part, parts.part "
"AS part, parts.quantity AS quantity FROM parts "
"WHERE parts.part = :part_1 UNION SELECT parts_1.part "
"AS part, parts_1.sub_part AS sub_part, parts_1.quantity "
"AS quantity FROM parts AS parts_1, anon_1 AS anon_2 "
"WHERE parts_1.part = anon_2.sub_part) "
"SELECT anon_1.sub_part, "
"sum(anon_1.quantity) AS total_quantity FROM anon_1 "
"JOIN parts ON anon_1.part = parts.part "
"GROUP BY anon_1.sub_part",
dialect=mssql.dialect()
)
开发者ID:biner,项目名称:sqlalchemy,代码行数:60,代码来源:test_cte.py
示例19: upgrade
def upgrade():
op.execute('SET search_path TO mineturer')
conn = op.get_bind()
res = conn.execute('select userid, password from users')
results = res.fetchall()
users = table(
'users',
column('bcrypt_pwd', sa.String),
column('userid', sa.Integer)
)
for result in results:
userid = result[0]
pwd = result[1]
bcrypt_pwd = bcrypt.hashpw(
pwd.encode('utf-8'),
bcrypt.gensalt()
)
op.execute(
users.update().where(
users.c.userid == op.inline_literal(userid)
).values({
'bcrypt_pwd': op.inline_literal(bcrypt_pwd)}
)
)
op.execute('SET search_path TO public')
开发者ID:atlefren,项目名称:mineturer2,代码行数:28,代码来源:4f7733ee071_fix_passwords.py
示例20: test_update_returning
def test_update_returning(self):
dialect = postgresql.dialect()
table1 = table(
'mytable',
column(
'myid', Integer),
column(
'name', String(128)),
column(
'description', String(128)))
u = update(
table1,
values=dict(
name='foo')).returning(
table1.c.myid,
table1.c.name)
self.assert_compile(u,
'UPDATE mytable SET name=%(name)s '
'RETURNING mytable.myid, mytable.name',
dialect=dialect)
u = update(table1, values=dict(name='foo')).returning(table1)
self.assert_compile(u,
'UPDATE mytable SET name=%(name)s '
'RETURNING mytable.myid, mytable.name, '
'mytable.description', dialect=dialect)
u = update(table1, values=dict(name='foo'
)).returning(func.length(table1.c.name))
self.assert_compile(
u,
'UPDATE mytable SET name=%(name)s '
'RETURNING length(mytable.name) AS length_1',
dialect=dialect)
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:32,代码来源:test_compiler.py
注:本文中的sqlalchemy.sql.column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论