本文整理汇总了Python中sqlalchemy.sql.literal函数的典型用法代码示例。如果您正苦于以下问题:Python literal函数的具体用法?Python literal怎么用?Python literal使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了literal函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_positional_binds_2
def test_positional_binds_2(self):
orders = table('orders',
column('order'),
)
s = select([orders.c.order, literal("x")]).cte("regional_sales")
s = select([s.c.order, literal("y")])
dialect = default.DefaultDialect()
dialect.positional = True
dialect.paramstyle = 'numeric'
s1 = select([orders.c.order]).where(orders.c.order == 'x').\
cte("regional_sales_1")
s1a = s1.alias()
s2 = select([orders.c.order == 'y', s1a.c.order,
orders.c.order, s1.c.order]).\
where(orders.c.order == 'z').\
cte("regional_sales_2")
s3 = select([s2])
self.assert_compile(
s3,
'WITH regional_sales_1 AS (SELECT orders."order" AS "order" '
'FROM orders WHERE orders."order" = :1), regional_sales_2 AS '
'(SELECT orders."order" = :2 AS anon_1, '
'anon_2."order" AS "order", '
'orders."order" AS "order", '
'regional_sales_1."order" AS "order" FROM orders, '
'regional_sales_1 '
'AS anon_2, regional_sales_1 '
'WHERE orders."order" = :3) SELECT regional_sales_2.anon_1, '
'regional_sales_2."order" FROM regional_sales_2',
checkpositional=('x', 'y', 'z'), dialect=dialect)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:34,代码来源:test_cte.py
示例2: create_mapper
def create_mapper(rack_specs_tbl):
"Mapper factory."
rs = rack_specs_tbl
polymorphic_select = select([
rs,
(case([(rs.c.has_movable_subitems,
literal(RACK_SPECS_TYPES.TUBE_RACK_SPECS))],
else_=literal(RACK_SPECS_TYPES.PLATE_SPECS))).label(
'rackspecs_type')
],
).alias('rackspecs')
m = mapper(RackSpecs, polymorphic_select,
id_attribute='rack_specs_id',
slug_expression=lambda cls: as_slug_expression(cls.name),
properties=dict(
manufacturer=relationship(Organization),
shape=relationship(RackShape, uselist=False,
back_populates='specs'),
rack_specs_type=
column_property(polymorphic_select.c.rackspecs_type),
),
polymorphic_on=polymorphic_select.c.rackspecs_type,
polymorphic_identity=RACK_SPECS_TYPES.RACK_SPECS,
)
RackSpecs.has_tubes = synonym('has_movable_subitems')
return m
开发者ID:helixyte,项目名称:TheLMA,代码行数:26,代码来源:rackspecs.py
示例3: _by_search_tsearch
def _by_search_tsearch(self, query, operand, maybe_negate):
# type: (Query, str, ConditionTransform) -> Query
tsquery = func.plainto_tsquery(literal("zulip.english_us_search"), literal(operand))
ts_locs_array = func.ts_match_locs_array
query = query.column(ts_locs_array(literal("zulip.english_us_search"),
column("rendered_content"),
tsquery).label("content_matches"))
# We HTML-escape the subject in Postgres to avoid doing a server round-trip
query = query.column(ts_locs_array(literal("zulip.english_us_search"),
func.escape_html(column("subject")),
tsquery).label("subject_matches"))
# Do quoted string matching. We really want phrase
# search here so we can ignore punctuation and do
# stemming, but there isn't a standard phrase search
# mechanism in Postgres
for term in re.findall('"[^"]+"|\S+', operand):
if term[0] == '"' and term[-1] == '"':
term = term[1:-1]
term = '%' + connection.ops.prep_for_like_query(term) + '%'
cond = or_(column("content").ilike(term),
column("subject").ilike(term))
query = query.where(maybe_negate(cond))
cond = column("search_tsvector").op("@@")(tsquery)
return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:26,代码来源:messages.py
示例4: limit_clause
def limit_clause(self, select):
text = ""
if select._limit is not None:
text += " \n LIMIT " + self.process(sql.literal(select._limit))
if select._offset is not None:
text += " OFFSET " + self.process(sql.literal(select._offset))
if select._limit is None:
text += " ROWS" # OFFSET n ROW[S]
return text
开发者ID:zzzeek,项目名称:sqlalchemy_akiban,代码行数:9,代码来源:base.py
示例5: user_requests_aggregate
def user_requests_aggregate(session, user):
"""Returns all pending requests for this user to approve across groups."""
members = session.query(
label("type", literal(1)),
label("id", Group.id),
label("name", Group.groupname),
).union(session.query(
label("type", literal(0)),
label("id", User.id),
label("name", User.username),
)).subquery()
now = datetime.utcnow()
groups = session.query(
label("id", Group.id),
label("name", Group.groupname),
).filter(
GroupEdge.group_id == Group.id,
GroupEdge.member_pk == user.id,
GroupEdge.active == True,
GroupEdge._role.in_(APPROVER_ROLE_INDICES),
user.enabled == True,
Group.enabled == True,
or_(
GroupEdge.expiration > now,
GroupEdge.expiration == None,
)
).subquery()
requests = session.query(
Request.id,
Request.requested_at,
GroupEdge.expiration,
label("role", GroupEdge._role),
Request.status,
label("requester", User.username),
label("type", members.c.type),
label("requesting", members.c.name),
label("reason", Comment.comment),
label("group_id", groups.c.id),
label("groupname", groups.c.name),
).filter(
Request.on_behalf_obj_pk == members.c.id,
Request.on_behalf_obj_type == members.c.type,
Request.requesting_id == groups.c.id,
Request.requester_id == User.id,
Request.status == "pending",
Request.id == RequestStatusChange.request_id,
RequestStatusChange.from_status == None,
GroupEdge.id == Request.edge_id,
Comment.obj_type == 3,
Comment.obj_pk == RequestStatusChange.id,
)
return requests
开发者ID:santoshankr,项目名称:grouper,代码行数:55,代码来源:user.py
示例6: limit_clause
def limit_clause(self, select):
# CUBRID supports: LIMIT <limit> and LIMIT <offset>, <limit>
limit, offset = select._limit, select._offset
if (limit, offset) == (None, None):
return ""
elif limit is None and offset is not None:
return " \n LIMIT %s, 1073741823" % (self.process(sql.literal(offset)))
elif offset is not None:
return " \n LIMIT %s, %s" % (self.process(sql.literal(offset)), self.process(sql.literal(limit)))
else:
return " \n LIMIT %s" % (self.process(sql.literal(limit)),)
开发者ID:zzzeek,项目名称:sqlalchemy_cubrid,代码行数:11,代码来源:base.py
示例7: limit_clause
def limit_clause(self, select):
text = ""
if select._limit is not None:
text += "\n LIMIT " + self.process(sql.literal(select._limit))
if select._offset is not None:
if select._limit is None:
text += "\n LIMIT " + self.process(sql.literal(-1))
text += " OFFSET " + self.process(sql.literal(select._offset))
else:
text += " OFFSET " + self.process(sql.literal(0))
return text
开发者ID:BeegorMif,项目名称:maraschino,代码行数:11,代码来源:base.py
示例8: get
def get(self):
query = self.get_argument("query", "")
offset = int(self.get_argument("offset", 0))
limit = int(self.get_argument("limit", 100))
if limit > 9000:
limit = 9000
groups = (
self.session.query(
label("type", literal("Group")),
label("id", Group.id),
label("name", Group.groupname),
)
.filter(Group.enabled == True, Group.groupname.like("%{}%".format(query)))
.subquery()
)
permissions = (
self.session.query(
label("type", literal("Permission")),
label("id", Permission.id),
label("name", Permission.name),
)
.filter(Permission.enabled == True, Permission.name.like("%{}%".format(query)))
.subquery()
)
users = (
self.session.query(
label("type", literal("User")), label("id", User.id), label("name", User.username)
)
.filter(User.enabled == True, User.username.like("%{}%".format(query)))
.subquery()
)
results_query = self.session.query("type", "id", "name").select_entity_from(
union_all(users.select(), permissions.select(), groups.select())
)
total = results_query.count()
results = results_query.offset(offset).limit(limit).all()
if len(results) == 1:
result = results[0]
return self.redirect("/{}s/{}".format(result.type.lower(), result.name))
self.render(
"search.html",
results=results,
search_query=query,
offset=offset,
limit=limit,
total=total,
)
开发者ID:dropbox,项目名称:grouper,代码行数:53,代码来源:search.py
示例9: test_notin
def test_notin(self):
left = column('left')
assert left.comparator.operate(operators.notin_op, [1, 2, 3]).compare(
BinaryExpression(
left,
Grouping(ClauseList(
literal(1), literal(2), literal(3)
)),
operators.notin_op
)
)
self._loop_test(operators.notin_op, [1, 2, 3])
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:12,代码来源:test_operators.py
示例10: _get_nodes_from_db
def _get_nodes_from_db(session):
return session.query(
label("type", literal("User")),
label("name", User.username)
).filter(
User.enabled == True
).union(session.query(
label("type", literal("Group")),
label("name", Group.groupname))
).filter(
Group.enabled == True
).all()
开发者ID:tmildorf,项目名称:grouper,代码行数:12,代码来源:graph.py
示例11: _get_edges_from_db
def _get_edges_from_db(session):
parent = aliased(Group)
group_member = aliased(Group)
user_member = aliased(User)
edges = []
now = datetime.utcnow()
query = session.query(
label("groupname", parent.groupname),
label("type", literal("Group")),
label("name", group_member.groupname),
label("role", GroupEdge._role)
).filter(
parent.id == GroupEdge.group_id,
group_member.id == GroupEdge.member_pk,
GroupEdge.active == True,
parent.enabled == True,
group_member.enabled == True,
or_(
GroupEdge.expiration > now,
GroupEdge.expiration == None
),
GroupEdge.member_type == 1
).union(session.query(
label("groupname", parent.groupname),
label("type", literal("User")),
label("name", user_member.username),
label("role", GroupEdge._role)
).filter(
parent.id == GroupEdge.group_id,
user_member.id == GroupEdge.member_pk,
GroupEdge.active == True,
parent.enabled == True,
user_member.enabled == True,
or_(
GroupEdge.expiration > now,
GroupEdge.expiration == None
),
GroupEdge.member_type == 0
))
for record in query.all():
edges.append((
("Group", record.groupname),
(record.type, record.name),
{"role": record.role},
))
return edges
开发者ID:tmildorf,项目名称:grouper,代码行数:51,代码来源:graph.py
示例12: get_select_precolumns
def get_select_precolumns(self, select):
"""Called when building a ``SELECT`` statement, position is just
before column list Firebird puts the limit and offset right
after the ``SELECT``...
"""
result = ""
if select._limit:
result += "FIRST %s " % self.process(sql.literal(select._limit))
if select._offset:
result += "SKIP %s " % self.process(sql.literal(select._offset))
if select._distinct:
result += "DISTINCT "
return result
开发者ID:wangzhengbo1204,项目名称:Python,代码行数:14,代码来源:base.py
示例13: _test_comparison_op
def _test_comparison_op(self, py_op, fwd_op, rev_op):
dt = datetime.datetime(2012, 5, 10, 15, 27, 18)
for (lhs, rhs, l_sql, r_sql) in (
('a', self.table1.c.myid, ':myid_1', 'mytable.myid'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
(self.table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
(self.table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(self.table1.c.myid, self.table1.c.myid,
'mytable.myid', 'mytable.myid'),
(literal('a'), 'b', ':param_1', ':param_2'),
(literal('a'), self.table1.c.myid, ':param_1', 'mytable.myid'),
(literal('a'), literal('b'), ':param_1', ':param_2'),
(dt, literal('b'), ':param_2', ':param_1'),
(literal('b'), dt, ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
# 'a' < 'b' -or- 'b' > 'a'.
compiled = str(py_op(lhs, rhs))
fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
self.assert_(compiled == fwd_sql or compiled == rev_sql,
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:25,代码来源:test_operators.py
示例14: test_positional_binds_2_asliteral
def test_positional_binds_2_asliteral(self):
orders = table("orders", column("order"))
s = select([orders.c.order, literal("x")]).cte("regional_sales")
s = select([s.c.order, literal("y")])
dialect = default.DefaultDialect()
dialect.positional = True
dialect.paramstyle = "numeric"
s1 = (
select([orders.c.order])
.where(orders.c.order == "x")
.cte("regional_sales_1")
)
s1a = s1.alias()
s2 = (
select(
[
orders.c.order == "y",
s1a.c.order,
orders.c.order,
s1.c.order,
]
)
.where(orders.c.order == "z")
.cte("regional_sales_2")
)
s3 = select([s2])
self.assert_compile(
s3,
"WITH regional_sales_1 AS "
'(SELECT orders."order" AS "order" '
"FROM orders "
"WHERE orders.\"order\" = 'x'), "
"regional_sales_2 AS "
"(SELECT orders.\"order\" = 'y' AS anon_1, "
'anon_2."order" AS "order", orders."order" AS "order", '
'regional_sales_1."order" AS "order" '
"FROM orders, regional_sales_1 AS anon_2, regional_sales_1 "
"WHERE orders.\"order\" = 'z') "
'SELECT regional_sales_2.anon_1, regional_sales_2."order" '
"FROM regional_sales_2",
checkpositional=(),
dialect=dialect,
literal_binds=True,
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:48,代码来源:test_cte.py
示例15: test_noorderby_parameters_insubquery
def test_noorderby_parameters_insubquery(self):
"""test that the ms-sql dialect does not include ORDER BY
positional parameters in subqueries"""
table1 = table(
"mytable",
column("myid", Integer),
column("name", String),
column("description", String),
)
q = select(
[table1.c.myid, sql.literal('bar').label('c1')],
order_by=[table1.c.name + '-']
).alias("foo")
crit = q.c.myid == table1.c.myid
dialect = mssql.dialect()
dialect.paramstyle = "qmark"
dialect.positional = True
self.assert_compile(
select(["*"], crit),
"SELECT * FROM (SELECT mytable.myid AS "
"myid, ? AS c1 FROM mytable) AS foo, mytable WHERE "
"foo.myid = mytable.myid",
dialect=dialect,
checkparams={'param_1': 'bar'},
# if name_1 is included, too many parameters are passed to dbapi
checkpositional=('bar', )
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_compiler.py
示例16: messages_in_narrow_backend
def messages_in_narrow_backend(request, user_profile,
msg_ids = REQ(validator=check_list(check_int)),
narrow = REQ(converter=narrow_parameter)):
# type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse
# Note that this function will only work on messages the user
# actually received
# TODO: We assume that the narrow is a search. For now this works because
# the browser only ever calls this function for searches, since it can't
# apply that narrow operator itself.
query = select([column("message_id"), column("subject"), column("rendered_content")],
and_(column("user_profile_id") == literal(user_profile.id),
column("message_id").in_(msg_ids)),
join(table("zerver_usermessage"), table("zerver_message"),
literal_column("zerver_usermessage.message_id") ==
literal_column("zerver_message.id")))
builder = NarrowBuilder(user_profile, column("message_id"))
for term in narrow:
query = builder.add_term(query, term)
sa_conn = get_sqlalchemy_connection()
query_result = list(sa_conn.execute(query).fetchall())
search_fields = dict()
for row in query_result:
(message_id, subject, rendered_content, content_matches, subject_matches) = row
search_fields[message_id] = get_search_fields(rendered_content, subject,
content_matches, subject_matches)
return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py
示例17: visibility_horizon_query
def visibility_horizon_query(self):
"""Get a query object that returns the highest category this one is visible from."""
cte_query = (
select(
[
Category.id,
Category.parent_id,
db.case([(Category.visibility.is_(None), None)], else_=(Category.visibility - 1)).label("n"),
literal(0).label("level"),
]
)
.where(Category.id == self.id)
.cte("visibility_horizon", recursive=True)
)
parent_query = select(
[
Category.id,
Category.parent_id,
db.case(
[(Category.visibility.is_(None) & cte_query.c.n.is_(None), None)],
else_=db.func.least(Category.visibility, cte_query.c.n) - 1,
),
cte_query.c.level + 1,
]
).where(db.and_(Category.id == cte_query.c.parent_id, (cte_query.c.n > 0) | cte_query.c.n.is_(None)))
cte_query = cte_query.union_all(parent_query)
return db.session.query(cte_query.c.id, cte_query.c.n).order_by(cte_query.c.level.desc()).limit(1)
开发者ID:OmeGak,项目名称:indico,代码行数:27,代码来源:categories.py
示例18: visit_tag_query
def visit_tag_query(self, q):
tags = self.session.query(Tag).filter_by(name=q.name)
if tags.count() == 1:
return ItemVersion.tags.any(id=tags[0].id)
else:
from sqlalchemy.sql import literal
return literal(False)
开发者ID:inducer,项目名称:synoptic,代码行数:7,代码来源:datamodel.py
示例19: access_list_paths
def access_list_paths(self, member, prefix=None, include_owned=False,
include_containers=True):
"""Return the list of paths granted to member.
Keyword arguments:
prefix -- return only paths starting with prefix (default None)
include_owned -- return also paths owned by member (default False)
include_containers -- return also container paths owned by member
(default True)
"""
xfeatures_xfeaturevals = self.xfeatures.join(self.xfeaturevals)
selectable = (self.groups.c.owner + ':' + self.groups.c.name)
member_groups = select([selectable.label('value')],
self.groups.c.member == member)
members = select([literal(member).label('value')])
any = select([literal('*').label('value')])
u = union(member_groups, members, any).alias()
inner_join = join(xfeatures_xfeaturevals, u,
self.xfeaturevals.c.value == u.c.value)
s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
if prefix:
like = lambda p: self.xfeatures.c.path.like(
self.escape_like(p) + '%', escape=ESCAPE_CHAR)
s = s.where(or_(*map(like,
self.access_inherit(prefix) or [prefix])))
r = self.conn.execute(s)
l = [row[0] for row in r.fetchall()]
r.close()
if include_owned:
container_nodes = select(
[self.nodes.c.node],
self.nodes.c.parent == self.node_lookup(member))
condition = self.nodes.c.parent.in_(container_nodes)
if include_containers:
condition = or_(condition,
self.nodes.c.node.in_(container_nodes))
s = select([self.nodes.c.path], condition)
r = self.conn.execute(s)
l += [row[0] for row in r.fetchall() if row[0] not in l]
r.close()
return l
开发者ID:antonis-m,项目名称:synnefo,代码行数:47,代码来源:permissions.py
示例20: by_sender
def by_sender(self, query, operand, maybe_negate):
try:
sender = get_user_profile_by_email(operand)
except UserProfile.DoesNotExist:
raise BadNarrowOperator('unknown user ' + operand)
cond = column("sender_id") == literal(sender.id)
return query.where(maybe_negate(cond))
开发者ID:danshev,项目名称:zulip,代码行数:8,代码来源:messages.py
注:本文中的sqlalchemy.sql.literal函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论