本文整理汇总了Python中sqlalchemy.literal_column函数的典型用法代码示例。如果您正苦于以下问题:Python literal_column函数的具体用法?Python literal_column怎么用?Python literal_column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了literal_column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_group_concat_sqlite_one_arg
def test_group_concat_sqlite_one_arg(db_session):
"""
It should use SQLite's deafult arguments (comma delimiter)
"""
from sqlalchemy import literal_column
from occams_datastore.utils.sql import group_concat
if db_session.bind.url.drivername != 'sqlite':
pytest.skip('Not using SQLite')
data = (
db_session.query(
literal_column("'myitem'").label('name'),
literal_column("'foo'").label('value'))
.union(
db_session.query(
literal_column("'myitem'").label('name'),
literal_column("'bar'").label('value')))
.subquery())
query = (
db_session.query(group_concat(data.c.value))
.select_from(data)
.group_by(data.c.name))
result, = query.one()
assert sorted(['foo', 'bar']) == sorted(result.split(','))
开发者ID:jkrooskos,项目名称:occams_datastore,代码行数:27,代码来源:test_sql.py
示例2: test_tuple_containment
def test_tuple_containment(self):
for test, exp in [
([("a", "b")], True),
([("a", "c")], False),
([("f", "q"), ("a", "b")], True),
([("f", "q"), ("a", "c")], False),
]:
eq_(
testing.db.execute(
select(
[
tuple_(
literal_column("'a'"), literal_column("'b'")
).in_(
[
tuple_(
*[
literal_column("'%s'" % letter)
for letter in elem
]
)
for elem in test
]
)
]
)
).scalar(),
exp,
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:30,代码来源:test_query.py
示例3: execute
def execute(self, connection, filter_values):
max_date_query = sqlalchemy.select([
sqlalchemy.func.max(sqlalchemy.column('completed_on')).label('completed_on'),
sqlalchemy.column('case_id').label('case_id')
]).select_from(sqlalchemy.table(self.table_name))
if self.filters:
for filter in self.filters:
max_date_query.append_whereclause(filter.build_expression())
max_date_query.append_group_by(
sqlalchemy.column('case_id')
)
max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')
asha_table = self.get_asha_table_name()
checklist_query = sqlalchemy.select()
for column in self.columns:
checklist_query.append_column(column.build_column())
checklist_query = checklist_query.where(
sqlalchemy.literal_column('"{}".case_id'.format(asha_table)) == max_date_subquery.c.case_id
).where(
sqlalchemy.literal_column('"{}".completed_on'.format(asha_table)) == max_date_subquery.c.completed_on
).select_from(sqlalchemy.table(asha_table))
return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:28,代码来源:sql_data.py
示例4: test_row_case_sensitive_unoptimized
def test_row_case_sensitive_unoptimized(self):
ins_db = engines.testing_engine(options={"case_sensitive": True})
row = ins_db.execute(
select([
literal_column("1").label("case_insensitive"),
literal_column("2").label("CaseSensitive"),
text("3 AS screw_up_the_cols")
])
).first()
eq_(
list(row.keys()),
["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
in_("case_insensitive", row._keymap)
in_("CaseSensitive", row._keymap)
not_in_("casesensitive", row._keymap)
eq_(row["case_insensitive"], 1)
eq_(row["CaseSensitive"], 2)
eq_(row["screw_up_the_cols"], 3)
assert_raises(KeyError, lambda: row["Case_insensitive"])
assert_raises(KeyError, lambda: row["casesensitive"])
assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:25,代码来源:test_resultset.py
示例5: test_group_concat_postgresql_invalid_args
def test_group_concat_postgresql_invalid_args(db_session):
"""
It should only support at least two arguments in PostgreSQL
"""
from sqlalchemy import literal_column
from occams_datastore.utils.sql import group_concat
if db_session.bind.url.drivername != 'postgresql':
pytest.skip('Not using PostgreSQL')
data = (
db_session.query(
literal_column("'myitem'").label('name'),
literal_column("'foo'").label('value'))
.union(
db_session.query(
literal_column("'myitem'").label('name'),
literal_column("'bar'").label('value')))
.subquery())
query = (
db_session.query(group_concat(data.c.value))
.select_from(data)
.group_by(data.c.name))
with pytest.raises(TypeError):
result, = query.one()
开发者ID:jkrooskos,项目名称:occams_datastore,代码行数:27,代码来源:test_sql.py
示例6: test_tuple_containment
def test_tuple_containment(self):
for test, exp in [
([('a', 'b')], True),
([('a', 'c')], False),
([('f', 'q'), ('a', 'b')], True),
([('f', 'q'), ('a', 'c')], False)
]:
eq_(
testing.db.execute(
select([
tuple_(
literal_column("'a'"),
literal_column("'b'")
).
in_([
tuple_(*[
literal_column("'%s'" % letter)
for letter in elem
]) for elem in test
])
])
).scalar(),
exp
)
开发者ID:anti-social,项目名称:sqlalchemy,代码行数:25,代码来源:test_query.py
示例7: test_row_case_sensitive
def test_row_case_sensitive(self):
row = testing.db.execute(
select([
literal_column("1").label("case_insensitive"),
literal_column("2").label("CaseSensitive")
])
).first()
eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
in_("case_insensitive", row._keymap)
in_("CaseSensitive", row._keymap)
not_in_("casesensitive", row._keymap)
eq_(row["case_insensitive"], 1)
eq_(row["CaseSensitive"], 2)
assert_raises(
KeyError,
lambda: row["Case_insensitive"]
)
assert_raises(
KeyError,
lambda: row["casesensitive"]
)
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:25,代码来源:test_resultset.py
示例8: test_percent_sign_round_trip
def test_percent_sign_round_trip(self):
"""test that the DBAPI accommodates for escaped / nonescaped
percent signs in a way that matches the compiler
"""
m = self.metadata
t = Table('t', m, Column('data', String(50)))
t.create(config.db)
with config.db.begin() as conn:
conn.execute(t.insert(), dict(data="some % value"))
conn.execute(t.insert(), dict(data="some %% other value"))
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some % value'"))
),
"some % value"
)
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some %% other value'"))
), "some %% other value"
)
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:26,代码来源:test_dialect.py
示例9: test_text_doesnt_explode
def test_text_doesnt_explode(self):
for s in [
select(
[
case(
[
(
info_table.c.info == 'pk_4_data',
text("'yes'"))],
else_=text("'no'"))
]).order_by(info_table.c.info),
select(
[
case(
[
(
info_table.c.info == 'pk_4_data',
literal_column("'yes'"))],
else_=literal_column("'no'")
)]
).order_by(info_table.c.info),
]:
if testing.against("firebird"):
eq_(s.execute().fetchall(), [
('no ', ), ('no ', ), ('no ', ), ('yes', ),
('no ', ), ('no ', ),
])
else:
eq_(s.execute().fetchall(), [
('no', ), ('no', ), ('no', ), ('yes', ),
('no', ), ('no', ),
])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:35,代码来源:test_case_statement.py
示例10: test_select_composition_seven
def test_select_composition_seven(self):
self.assert_compile(
select([
literal_column('col1'),
literal_column('col2')
], from_obj=table('tablename')).alias('myalias'),
"SELECT col1, col2 FROM tablename"
)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:8,代码来源:test_text.py
示例11: test_select_composition_seven
def test_select_composition_seven(self):
self.assert_compile(
select(
[literal_column("col1"), literal_column("col2")],
from_obj=table("tablename"),
).alias("myalias"),
"SELECT col1, col2 FROM tablename",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:8,代码来源:test_text.py
示例12: on_insert_also
def on_insert_also(self):
Foo2 = sql.t.Foo2
n_column = sqlalchemy.literal_column('1').label('n')
return (Foo2.insert().values(n=sqlalchemy.literal_column('new.id'),
foo=sqlalchemy.literal_column('new.description')),
sql.InsertFromSelect(Foo2, sqlalchemy.select([n_column])),
"select 42",
)
开发者ID:nicLucian,项目名称:pytis,代码行数:8,代码来源:demo.py
示例13: _update_current_rev
def _update_current_rev(self, old, new):
if old == new:
return
if new is None:
self.impl._exec(self._version.delete())
elif old is None:
self.impl._exec(self._version.insert().values(version_num=literal_column("'%s'" % new)))
else:
self.impl._exec(self._version.update().values(version_num=literal_column("'%s'" % new)))
开发者ID:leslyKay,项目名称:me,代码行数:9,代码来源:migration.py
示例14: data
def data(self,
use_choice_labels=False,
expand_collections=False,
ignore_private=True):
session = self.db_session
query = (
session.query(
models.Patient.id.label('id'),
models.Site.name.label('site'),
models.Patient.pid.label('pid'))
.join(models.Site))
# BBB 2014-02-20 (Marco): AEH needs Early Test
EarlyTest = aliased(models.Enrollment)
subquery = (
session.query(EarlyTest.patient_id, EarlyTest.reference_number)
.filter(EarlyTest.study.has(
models.Study.code.in_([literal_column("'ET'"),
literal_column("'LTW'"),
literal_column("'CVCT'")])))
.subquery())
query = (
query
.outerjoin(subquery, subquery.c.patient_id == models.Patient.id)
.add_column(subquery.c.reference_number.label('early_id')))
# Add every known reference number
for reftype in self.reftypes:
query = query.add_column(
session.query(
group_concat(
models.PatientReference.reference_number, ';'))
.filter(
models.PatientReference.patient_id == models.Patient.id)
.filter(
models.PatientReference.reference_type_id == reftype.id)
.group_by(models.PatientReference.patient_id)
.correlate(models.Patient)
.as_scalar()
.label(reftype.name))
CreateUser = aliased(datastore.User)
ModifyUser = aliased(datastore.User)
query = (
query
.join(CreateUser, models.Patient.create_user)
.join(ModifyUser, models.Patient.modify_user)
.add_columns(
models.Patient.create_date,
CreateUser.key.label('create_user'),
models.Patient.modify_date,
ModifyUser.key.label('modify_user'))
.order_by(models.Patient.id))
return query
开发者ID:davidmote,项目名称:occams_studies,代码行数:56,代码来源:pid.py
示例15: search_query
def search_query(cls, tokens, weight_func=None, include_misses=False, ordered=True):
# Read the searchable columns from the table (strings)
columns = cls.__searchable_columns__
# Convert the columns from strings into column objects
columns = [getattr(cls, c) for c in columns]
# The model name that can be used to match search result to model
cls_name = literal_column("'{}'".format(cls.__name__))
# Filter out id: tokens for later
ids, tokens = process_id_option(tokens)
# If there are still tokens left after id: token filtering
if tokens:
# Generate the search weight expression from the
# searchable columns, tokens and patterns
if not weight_func:
weight_func = weight_expression
weight = weight_func(columns, tokens)
# If the search expression only included "special" tokens like id:
else:
weight = literal_column(str(1))
# Create an array of stringified detail columns
details = getattr(cls, "__search_detail_columns__", None)
if details:
details = [cast(getattr(cls, d), Unicode) for d in details]
else:
details = [literal_column("NULL")]
# Create a query object
query = db.session.query(
cls_name.label("model"),
cls.id.label("id"),
cls.name.label("name"),
array(details).label("details"),
weight.label("weight"),
)
# Filter out specific ids (optional)
if ids:
query = query.filter(cls.id.in_(ids))
# Filter out results that don't match the patterns at all (optional)
if not include_misses:
query = query.filter(weight > 0)
# Order by weight (optional)
if ordered:
query = query.order_by(desc(weight))
return query
开发者ID:skylines-project,项目名称:skylines,代码行数:56,代码来源:search.py
示例16: get_next_bus
def get_next_bus(mc, db, stop_id):
now_datetime = datetime.datetime.utcnow().replace(tzinfo=UTC)
now_datetime = LONDON.normalize(now_datetime.astimezone(LONDON))
now_day = now_datetime.weekday()
now_time = now_datetime.time()
mc_key = "V6:USERSTOP:" + str(stop_id) + "USERTIME:" + now_datetime.strftime("%w%H%M")
bus = mc.get(mc_key)
if not bus:
today_query = db.query(DepartureTimeDeref, literal_column("0").label("days_future")).\
filter_by(bus_stop_id=stop_id).\
filter(DepartureTimeDeref.time >= now_time).\
filter(DepartureTimeDeref.valid_days.contains(cast([DAYS[now_day]], postgresql.ARRAY(String)))).\
join(DepartureTimeDeref.timetable).\
filter(Timetable.valid_from <= now_datetime.date()).\
filter(Timetable.valid_to >= now_datetime.date())
single_day_delta = datetime.timedelta(days=1)
tomorrow_query = db.query(DepartureTimeDeref, literal_column("1").label("days_future")).\
filter_by(bus_stop_id=stop_id).\
filter(DepartureTimeDeref.valid_days.contains(cast([DAYS[now_day + 1]], postgresql.ARRAY(String)))).\
join(DepartureTimeDeref.timetable).\
filter(Timetable.valid_from <= now_datetime.date() + single_day_delta).\
filter(Timetable.valid_to >= now_datetime.date() + single_day_delta)
bus = today_query.union_all(tomorrow_query).\
options(joinedload(DepartureTimeDeref.timetable, Timetable.route)).\
order_by("days_future").\
order_by(DepartureTimeDeref.time).\
first()
if bus:
bus = {'departure': bus[0].to_JSON(), 'days_future': int(bus[1])}
mc.set(mc_key, bus, 30*24*60*60)
if bus:
departure = bus['departure']
# Create a day delta, is this departure time today or tomorrow?
day_delta = datetime.timedelta(days=bus['days_future'])
departure_day = now_datetime.date() + day_delta
departure_dt = datetime.datetime.combine(departure_day, departure['time'])
# Add timezone infomation. pytz will handle DST correctly
departure_dt = LONDON.localize(departure_dt)
departure['time'] = departure_dt
bus = departure
return bus
开发者ID:thomaspurchas,项目名称:Wheres-the-Damn-U1,代码行数:53,代码来源:bus.py
示例17: timeseries
def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
# Reading this blog post
# http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
# inspired this implementation.
t = self.point_table
# Special case for the 'quarter' unit of aggregation.
step = '3 months' if agg_unit == 'quarter' else '1 ' + agg_unit
# Create a CTE to represent every time bucket in the timeseries
# with a default count of 0
day_generator = func.generate_series(func.date_trunc(agg_unit, start),
func.date_trunc(agg_unit, end),
step)
defaults = select([sa.literal_column("0").label('count'),
day_generator.label('time_bucket')])\
.alias('defaults')
where_filters = [t.c.point_date >= start, t.c.point_date <= end]
if column_filters is not None:
# Column filters has to be iterable here, because the '+' operator
# behaves differently for SQLAlchemy conditions. Instead of
# combining the conditions together, it would try to build
# something like :param1 + <column_filters> as a new condition.
where_filters += [column_filters]
# Create a CTE that grabs the number of records contained in each time
# bucket. Will only have rows for buckets with records.
actuals = select([func.count(t.c.hash).label('count'),
func.date_trunc(agg_unit, t.c.point_date).
label('time_bucket')])\
.where(sa.and_(*where_filters))\
.group_by('time_bucket')
# Also filter by geometry if requested
if geom:
contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
actuals = actuals.where(contains)
# Need to alias to make it usable in a subexpression
actuals = actuals.alias('actuals')
# Outer join the default and observed values
# to create the timeseries select statement.
# If no observed value in a bucket, use the default.
name = sa.literal_column("'{}'".format(self.dataset_name))\
.label('dataset_name')
bucket = defaults.c.time_bucket.label('time_bucket')
count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
ts = select([name, bucket, count]).\
select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))
return ts
开发者ID:gitter-badger,项目名称:plenario,代码行数:53,代码来源:models.py
示例18: timeseries
def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
# Reading this blog post
# http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
# inspired this implementation.
t = self.point_table
if agg_unit == 'quarter':
step = '3 months'
else:
step = '1 ' + agg_unit
# Create a CTE to represent every time bucket in the timeseries
# with a default count of 0
day_generator = func.generate_series(func.date_trunc(agg_unit, start),
func.date_trunc(agg_unit, end),
step)
defaults = select([sa.literal_column("0").label('count'),
day_generator.label('time_bucket')])\
.alias('defaults')
# Create a CTE that grabs the number of records
# contained in each time bucket.
# Will only have rows for buckets with records.
where_filters = [t.c.point_date >= start,
t.c.point_date <= end]
if column_filters:
where_filters += column_filters
actuals = select([func.count(t.c.hash).label('count'),
func.date_trunc(agg_unit, t.c.point_date).
label('time_bucket')])\
.where(sa.and_(*where_filters))\
.group_by('time_bucket')
# Also filter by geometry if requested
if geom:
contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
actuals = actuals.where(contains)
# Need to alias to make it usable in a subexpression
actuals = actuals.alias('actuals')
# Outer join the default and observed values
# to create the timeseries select statement.
# If no observed value in a bucket, use the default.
name = sa.literal_column("'{}'".format(self.dataset_name))\
.label('dataset_name')
bucket = defaults.c.time_bucket.label('time_bucket')
count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
ts = select([name, bucket, count]).\
select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))
return ts
开发者ID:carhart,项目名称:plenario,代码行数:52,代码来源:models.py
示例19: test_select_composition_six
def test_select_composition_six(self):
# test that "auto-labeling of subquery columns"
# doesn't interfere with literal columns,
# exported columns don't get quoted
self.assert_compile(
select([
literal_column("column1 AS foobar"),
literal_column("column2 AS hoho"), table1.c.myid],
from_obj=[table1]).select(),
"SELECT column1 AS foobar, column2 AS hoho, myid FROM "
"(SELECT column1 AS foobar, column2 AS hoho, "
"mytable.myid AS myid FROM mytable)"
)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:13,代码来源:test_text.py
示例20: _update_current_rev
def _update_current_rev(self, old, new):
if old == new: # pragma: no cover
return
if new is None: # pragma: no cover
self.impl._exec(Version.__table__.delete().where(package=self.pkg_name))
elif old is None:
self.impl._exec(
Version.__table__.insert().values(package=self.pkg_name, version_num=sqla.literal_column("'%s'" % new))
)
else:
self.impl._exec(
Version.__table__.update().values(package=self.pkg_name, version_num=sqla.literal_column("'%s'" % new))
)
开发者ID:webmaven,项目名称:ptah,代码行数:13,代码来源:migrate.py
注:本文中的sqlalchemy.literal_column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论