• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.literal函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.literal函数的典型用法代码示例。如果您正苦于以下问题:Python literal函数的具体用法?Python literal怎么用?Python literal使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了literal函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_positional_binds_2

    def test_positional_binds_2(self):
        orders = table('orders',
                       column('order'),
                       )
        s = select([orders.c.order, literal("x")]).cte("regional_sales")
        s = select([s.c.order, literal("y")])
        dialect = default.DefaultDialect()
        dialect.positional = True
        dialect.paramstyle = 'numeric'
        s1 = select([orders.c.order]).where(orders.c.order == 'x').\
            cte("regional_sales_1")

        s1a = s1.alias()

        s2 = select([orders.c.order == 'y', s1a.c.order,
                     orders.c.order, s1.c.order]).\
            where(orders.c.order == 'z').\
            cte("regional_sales_2")

        s3 = select([s2])

        self.assert_compile(
            s3,
            'WITH regional_sales_1 AS (SELECT orders."order" AS "order" '
            'FROM orders WHERE orders."order" = :1), regional_sales_2 AS '
            '(SELECT orders."order" = :2 AS anon_1, '
            'anon_2."order" AS "order", '
            'orders."order" AS "order", '
            'regional_sales_1."order" AS "order" FROM orders, '
            'regional_sales_1 '
            'AS anon_2, regional_sales_1 '
            'WHERE orders."order" = :3) SELECT regional_sales_2.anon_1, '
            'regional_sales_2."order" FROM regional_sales_2',
            checkpositional=('x', 'y', 'z'), dialect=dialect)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:34,代码来源:test_cte.py


示例2: create_mapper

def create_mapper(rack_specs_tbl):
    "Mapper factory."
    rs = rack_specs_tbl
    polymorphic_select = select([
        rs,
        (case([(rs.c.has_movable_subitems,
                literal(RACK_SPECS_TYPES.TUBE_RACK_SPECS))],
              else_=literal(RACK_SPECS_TYPES.PLATE_SPECS))).label(
                                                            'rackspecs_type')
        ],
        ).alias('rackspecs')
    m = mapper(RackSpecs, polymorphic_select,
            id_attribute='rack_specs_id',
            slug_expression=lambda cls: as_slug_expression(cls.name),
            properties=dict(
                manufacturer=relationship(Organization),
                shape=relationship(RackShape, uselist=False,
                                   back_populates='specs'),
                rack_specs_type=
                        column_property(polymorphic_select.c.rackspecs_type),
                ),
            polymorphic_on=polymorphic_select.c.rackspecs_type,
            polymorphic_identity=RACK_SPECS_TYPES.RACK_SPECS,
            )
    RackSpecs.has_tubes = synonym('has_movable_subitems')
    return m
开发者ID:helixyte,项目名称:TheLMA,代码行数:26,代码来源:rackspecs.py


示例3: _by_search_tsearch

    def _by_search_tsearch(self, query, operand, maybe_negate):
        # type: (Query, str, ConditionTransform) -> Query
        tsquery = func.plainto_tsquery(literal("zulip.english_us_search"), literal(operand))
        ts_locs_array = func.ts_match_locs_array
        query = query.column(ts_locs_array(literal("zulip.english_us_search"),
                                           column("rendered_content"),
                                           tsquery).label("content_matches"))
        # We HTML-escape the subject in Postgres to avoid doing a server round-trip
        query = query.column(ts_locs_array(literal("zulip.english_us_search"),
                                           func.escape_html(column("subject")),
                                           tsquery).label("subject_matches"))

        # Do quoted string matching.  We really want phrase
        # search here so we can ignore punctuation and do
        # stemming, but there isn't a standard phrase search
        # mechanism in Postgres
        for term in re.findall('"[^"]+"|\S+', operand):
            if term[0] == '"' and term[-1] == '"':
                term = term[1:-1]
                term = '%' + connection.ops.prep_for_like_query(term) + '%'
                cond = or_(column("content").ilike(term),
                           column("subject").ilike(term))
                query = query.where(maybe_negate(cond))

        cond = column("search_tsvector").op("@@")(tsquery)
        return query.where(maybe_negate(cond))
开发者ID:souravbadami,项目名称:zulip,代码行数:26,代码来源:messages.py


示例4: limit_clause

 def limit_clause(self, select):
     text = ""
     if select._limit is not None:
         text += " \n LIMIT " + self.process(sql.literal(select._limit))
     if select._offset is not None:
         text += " OFFSET " + self.process(sql.literal(select._offset))
         if select._limit is None:
             text += " ROWS"  # OFFSET n ROW[S]
     return text
开发者ID:zzzeek,项目名称:sqlalchemy_akiban,代码行数:9,代码来源:base.py


示例5: user_requests_aggregate

def user_requests_aggregate(session, user):
    """Returns all pending requests for this user to approve across groups."""

    members = session.query(
        label("type", literal(1)),
        label("id", Group.id),
        label("name", Group.groupname),
    ).union(session.query(
        label("type", literal(0)),
        label("id", User.id),
        label("name", User.username),
    )).subquery()

    now = datetime.utcnow()
    groups = session.query(
        label("id", Group.id),
        label("name", Group.groupname),
    ).filter(
        GroupEdge.group_id == Group.id,
        GroupEdge.member_pk == user.id,
        GroupEdge.active == True,
        GroupEdge._role.in_(APPROVER_ROLE_INDICES),
        user.enabled == True,
        Group.enabled == True,
        or_(
            GroupEdge.expiration > now,
            GroupEdge.expiration == None,
        )
    ).subquery()

    requests = session.query(
        Request.id,
        Request.requested_at,
        GroupEdge.expiration,
        label("role", GroupEdge._role),
        Request.status,
        label("requester", User.username),
        label("type", members.c.type),
        label("requesting", members.c.name),
        label("reason", Comment.comment),
        label("group_id", groups.c.id),
        label("groupname", groups.c.name),
    ).filter(
        Request.on_behalf_obj_pk == members.c.id,
        Request.on_behalf_obj_type == members.c.type,
        Request.requesting_id == groups.c.id,
        Request.requester_id == User.id,
        Request.status == "pending",
        Request.id == RequestStatusChange.request_id,
        RequestStatusChange.from_status == None,
        GroupEdge.id == Request.edge_id,
        Comment.obj_type == 3,
        Comment.obj_pk == RequestStatusChange.id,
    )
    return requests
开发者ID:santoshankr,项目名称:grouper,代码行数:55,代码来源:user.py


示例6: limit_clause

 def limit_clause(self, select):
     # CUBRID supports: LIMIT <limit> and LIMIT <offset>, <limit>
     limit, offset = select._limit, select._offset
     if (limit, offset) == (None, None):
         return ""
     elif limit is None and offset is not None:
         return " \n LIMIT %s, 1073741823" % (self.process(sql.literal(offset)))
     elif offset is not None:
         return " \n LIMIT %s, %s" % (self.process(sql.literal(offset)), self.process(sql.literal(limit)))
     else:
         return " \n LIMIT %s" % (self.process(sql.literal(limit)),)
开发者ID:zzzeek,项目名称:sqlalchemy_cubrid,代码行数:11,代码来源:base.py


示例7: limit_clause

 def limit_clause(self, select):
     text = ""
     if select._limit is not None:
         text +=  "\n LIMIT " + self.process(sql.literal(select._limit))
     if select._offset is not None:
         if select._limit is None:
             text += "\n LIMIT " + self.process(sql.literal(-1))
         text += " OFFSET " + self.process(sql.literal(select._offset))
     else:
         text += " OFFSET " + self.process(sql.literal(0))
     return text
开发者ID:BeegorMif,项目名称:maraschino,代码行数:11,代码来源:base.py


示例8: get

    def get(self):
        query = self.get_argument("query", "")
        offset = int(self.get_argument("offset", 0))
        limit = int(self.get_argument("limit", 100))
        if limit > 9000:
            limit = 9000

        groups = (
            self.session.query(
                label("type", literal("Group")),
                label("id", Group.id),
                label("name", Group.groupname),
            )
            .filter(Group.enabled == True, Group.groupname.like("%{}%".format(query)))
            .subquery()
        )

        permissions = (
            self.session.query(
                label("type", literal("Permission")),
                label("id", Permission.id),
                label("name", Permission.name),
            )
            .filter(Permission.enabled == True, Permission.name.like("%{}%".format(query)))
            .subquery()
        )

        users = (
            self.session.query(
                label("type", literal("User")), label("id", User.id), label("name", User.username)
            )
            .filter(User.enabled == True, User.username.like("%{}%".format(query)))
            .subquery()
        )

        results_query = self.session.query("type", "id", "name").select_entity_from(
            union_all(users.select(), permissions.select(), groups.select())
        )
        total = results_query.count()
        results = results_query.offset(offset).limit(limit).all()

        if len(results) == 1:
            result = results[0]
            return self.redirect("/{}s/{}".format(result.type.lower(), result.name))

        self.render(
            "search.html",
            results=results,
            search_query=query,
            offset=offset,
            limit=limit,
            total=total,
        )
开发者ID:dropbox,项目名称:grouper,代码行数:53,代码来源:search.py


示例9: test_notin

 def test_notin(self):
     left = column('left')
     assert left.comparator.operate(operators.notin_op, [1, 2, 3]).compare(
             BinaryExpression(
                 left,
                 Grouping(ClauseList(
                     literal(1), literal(2), literal(3)
                 )),
                 operators.notin_op
             )
         )
     self._loop_test(operators.notin_op, [1, 2, 3])
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:12,代码来源:test_operators.py


示例10: _get_nodes_from_db

 def _get_nodes_from_db(session):
     return session.query(
         label("type", literal("User")),
         label("name", User.username)
     ).filter(
         User.enabled == True
     ).union(session.query(
         label("type", literal("Group")),
         label("name", Group.groupname))
     ).filter(
         Group.enabled == True
     ).all()
开发者ID:tmildorf,项目名称:grouper,代码行数:12,代码来源:graph.py


示例11: _get_edges_from_db

    def _get_edges_from_db(session):

        parent = aliased(Group)
        group_member = aliased(Group)
        user_member = aliased(User)
        edges = []

        now = datetime.utcnow()

        query = session.query(
            label("groupname", parent.groupname),
            label("type", literal("Group")),
            label("name", group_member.groupname),
            label("role", GroupEdge._role)
        ).filter(
            parent.id == GroupEdge.group_id,
            group_member.id == GroupEdge.member_pk,
            GroupEdge.active == True,
            parent.enabled == True,
            group_member.enabled == True,
            or_(
                GroupEdge.expiration > now,
                GroupEdge.expiration == None
            ),
            GroupEdge.member_type == 1
        ).union(session.query(
            label("groupname", parent.groupname),
            label("type", literal("User")),
            label("name", user_member.username),
            label("role", GroupEdge._role)
        ).filter(
            parent.id == GroupEdge.group_id,
            user_member.id == GroupEdge.member_pk,
            GroupEdge.active == True,
            parent.enabled == True,
            user_member.enabled == True,
            or_(
                GroupEdge.expiration > now,
                GroupEdge.expiration == None
            ),
            GroupEdge.member_type == 0
        ))

        for record in query.all():
            edges.append((
                ("Group", record.groupname),
                (record.type, record.name),
                {"role": record.role},
            ))

        return edges
开发者ID:tmildorf,项目名称:grouper,代码行数:51,代码来源:graph.py


示例12: get_select_precolumns

    def get_select_precolumns(self, select):
        """Called when building a ``SELECT`` statement, position is just
        before column list Firebird puts the limit and offset right
        after the ``SELECT``...
        """

        result = ""
        if select._limit:
            result += "FIRST %s " % self.process(sql.literal(select._limit))
        if select._offset:
            result += "SKIP %s " % self.process(sql.literal(select._offset))
        if select._distinct:
            result += "DISTINCT "
        return result
开发者ID:wangzhengbo1204,项目名称:Python,代码行数:14,代码来源:base.py


示例13: _test_comparison_op

    def _test_comparison_op(self, py_op, fwd_op, rev_op):
        dt = datetime.datetime(2012, 5, 10, 15, 27, 18)
        for (lhs, rhs, l_sql, r_sql) in (
            ('a', self.table1.c.myid, ':myid_1', 'mytable.myid'),
            ('a', literal('b'), ':param_2', ':param_1'),  # note swap!
            (self.table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
            (self.table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
            (self.table1.c.myid, self.table1.c.myid,
                            'mytable.myid', 'mytable.myid'),
            (literal('a'), 'b', ':param_1', ':param_2'),
            (literal('a'), self.table1.c.myid, ':param_1', 'mytable.myid'),
            (literal('a'), literal('b'), ':param_1', ':param_2'),
            (dt, literal('b'), ':param_2', ':param_1'),
            (literal('b'), dt, ':param_1', ':param_2'),
            ):

            # the compiled clause should match either (e.g.):
            # 'a' < 'b' -or- 'b' > 'a'.
            compiled = str(py_op(lhs, rhs))
            fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
            rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)

            self.assert_(compiled == fwd_sql or compiled == rev_sql,
                         "\n'" + compiled + "'\n does not match\n'" +
                         fwd_sql + "'\n or\n'" + rev_sql + "'")
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:25,代码来源:test_operators.py


示例14: test_positional_binds_2_asliteral

    def test_positional_binds_2_asliteral(self):
        orders = table("orders", column("order"))
        s = select([orders.c.order, literal("x")]).cte("regional_sales")
        s = select([s.c.order, literal("y")])
        dialect = default.DefaultDialect()
        dialect.positional = True
        dialect.paramstyle = "numeric"
        s1 = (
            select([orders.c.order])
            .where(orders.c.order == "x")
            .cte("regional_sales_1")
        )

        s1a = s1.alias()

        s2 = (
            select(
                [
                    orders.c.order == "y",
                    s1a.c.order,
                    orders.c.order,
                    s1.c.order,
                ]
            )
            .where(orders.c.order == "z")
            .cte("regional_sales_2")
        )

        s3 = select([s2])

        self.assert_compile(
            s3,
            "WITH regional_sales_1 AS "
            '(SELECT orders."order" AS "order" '
            "FROM orders "
            "WHERE orders.\"order\" = 'x'), "
            "regional_sales_2 AS "
            "(SELECT orders.\"order\" = 'y' AS anon_1, "
            'anon_2."order" AS "order", orders."order" AS "order", '
            'regional_sales_1."order" AS "order" '
            "FROM orders, regional_sales_1 AS anon_2, regional_sales_1 "
            "WHERE orders.\"order\" = 'z') "
            'SELECT regional_sales_2.anon_1, regional_sales_2."order" '
            "FROM regional_sales_2",
            checkpositional=(),
            dialect=dialect,
            literal_binds=True,
        )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:48,代码来源:test_cte.py


示例15: test_noorderby_parameters_insubquery

    def test_noorderby_parameters_insubquery(self):
        """test that the ms-sql dialect does not include ORDER BY
        positional parameters in subqueries"""

        table1 = table(
            "mytable",
            column("myid", Integer),
            column("name", String),
            column("description", String),
        )

        q = select(
            [table1.c.myid, sql.literal('bar').label('c1')],
            order_by=[table1.c.name + '-']
        ).alias("foo")
        crit = q.c.myid == table1.c.myid
        dialect = mssql.dialect()
        dialect.paramstyle = "qmark"
        dialect.positional = True
        self.assert_compile(
            select(["*"], crit),
            "SELECT * FROM (SELECT mytable.myid AS "
            "myid, ? AS c1 FROM mytable) AS foo, mytable WHERE "
            "foo.myid = mytable.myid",
            dialect=dialect,
            checkparams={'param_1': 'bar'},
            # if name_1 is included, too many parameters are passed to dbapi
            checkpositional=('bar', )
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_compiler.py


示例16: messages_in_narrow_backend

def messages_in_narrow_backend(request, user_profile,
                               msg_ids = REQ(validator=check_list(check_int)),
                               narrow = REQ(converter=narrow_parameter)):
    # type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse

    # Note that this function will only work on messages the user
    # actually received

    # TODO: We assume that the narrow is a search.  For now this works because
    # the browser only ever calls this function for searches, since it can't
    # apply that narrow operator itself.

    query = select([column("message_id"), column("subject"), column("rendered_content")],
                   and_(column("user_profile_id") == literal(user_profile.id),
                        column("message_id").in_(msg_ids)),
                   join(table("zerver_usermessage"), table("zerver_message"),
                        literal_column("zerver_usermessage.message_id") ==
                        literal_column("zerver_message.id")))

    builder = NarrowBuilder(user_profile, column("message_id"))
    for term in narrow:
        query = builder.add_term(query, term)

    sa_conn = get_sqlalchemy_connection()
    query_result = list(sa_conn.execute(query).fetchall())

    search_fields = dict()
    for row in query_result:
        (message_id, subject, rendered_content, content_matches, subject_matches) = row
        search_fields[message_id] = get_search_fields(rendered_content, subject,
                                                      content_matches, subject_matches)

    return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py


示例17: visibility_horizon_query

 def visibility_horizon_query(self):
     """Get a query object that returns the highest category this one is visible from."""
     cte_query = (
         select(
             [
                 Category.id,
                 Category.parent_id,
                 db.case([(Category.visibility.is_(None), None)], else_=(Category.visibility - 1)).label("n"),
                 literal(0).label("level"),
             ]
         )
         .where(Category.id == self.id)
         .cte("visibility_horizon", recursive=True)
     )
     parent_query = select(
         [
             Category.id,
             Category.parent_id,
             db.case(
                 [(Category.visibility.is_(None) & cte_query.c.n.is_(None), None)],
                 else_=db.func.least(Category.visibility, cte_query.c.n) - 1,
             ),
             cte_query.c.level + 1,
         ]
     ).where(db.and_(Category.id == cte_query.c.parent_id, (cte_query.c.n > 0) | cte_query.c.n.is_(None)))
     cte_query = cte_query.union_all(parent_query)
     return db.session.query(cte_query.c.id, cte_query.c.n).order_by(cte_query.c.level.desc()).limit(1)
开发者ID:OmeGak,项目名称:indico,代码行数:27,代码来源:categories.py


示例18: visit_tag_query

 def visit_tag_query(self, q):
     tags = self.session.query(Tag).filter_by(name=q.name)
     if tags.count() == 1:
         return ItemVersion.tags.any(id=tags[0].id)
     else:
         from sqlalchemy.sql import literal
         return literal(False)
开发者ID:inducer,项目名称:synoptic,代码行数:7,代码来源:datamodel.py


示例19: access_list_paths

    def access_list_paths(self, member, prefix=None, include_owned=False,
                          include_containers=True):
        """Return the list of paths granted to member.

        Keyword arguments:
        prefix -- return only paths starting with prefix (default None)
        include_owned -- return also paths owned by member (default False)
        include_containers -- return also container paths owned by member
                              (default True)

        """

        xfeatures_xfeaturevals = self.xfeatures.join(self.xfeaturevals)

        selectable = (self.groups.c.owner + ':' + self.groups.c.name)
        member_groups = select([selectable.label('value')],
                               self.groups.c.member == member)

        members = select([literal(member).label('value')])
        any = select([literal('*').label('value')])

        u = union(member_groups, members, any).alias()
        inner_join = join(xfeatures_xfeaturevals, u,
                          self.xfeaturevals.c.value == u.c.value)
        s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
        if prefix:
            like = lambda p: self.xfeatures.c.path.like(
                self.escape_like(p) + '%', escape=ESCAPE_CHAR)
            s = s.where(or_(*map(like,
                                 self.access_inherit(prefix) or [prefix])))
        r = self.conn.execute(s)
        l = [row[0] for row in r.fetchall()]
        r.close()

        if include_owned:
            container_nodes = select(
                [self.nodes.c.node],
                self.nodes.c.parent == self.node_lookup(member))
            condition = self.nodes.c.parent.in_(container_nodes)
            if include_containers:
                condition = or_(condition,
                                self.nodes.c.node.in_(container_nodes))
            s = select([self.nodes.c.path], condition)
            r = self.conn.execute(s)
            l += [row[0] for row in r.fetchall() if row[0] not in l]
            r.close()
        return l
开发者ID:antonis-m,项目名称:synnefo,代码行数:47,代码来源:permissions.py


示例20: by_sender

    def by_sender(self, query, operand, maybe_negate):
        try:
            sender = get_user_profile_by_email(operand)
        except UserProfile.DoesNotExist:
            raise BadNarrowOperator('unknown user ' + operand)

        cond = column("sender_id") == literal(sender.id)
        return query.where(maybe_negate(cond))
开发者ID:danshev,项目名称:zulip,代码行数:8,代码来源:messages.py



注:本文中的sqlalchemy.sql.literal函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.literal_column函数代码示例发布时间:2022-05-27
下一篇:
Python sql.label函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap