• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.literal_column函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.literal_column函数的典型用法代码示例。如果您正苦于以下问题:Python literal_column函数的具体用法?Python literal_column怎么用?Python literal_column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了literal_column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: load

    def load(self, request, response, subject, data):
        candidates = data['identifiers']
        if not candidates:
            return response([])

        identifiers = []
        for i, identifier in enumerate(candidates):
            identifiers.append("(%d, '%s')" % (i, str(identifier)))

        expr = select([column('rank'), column('id')],
            from_obj="(values %s) as subset(rank, id)" % ', '.join(identifiers))

        query = (self.schema.session.query(self.model)
            .join(expr.cte('__subset__'), literal_column('__subset__.id')==self.model.id)
            .order_by(literal_column('__subset__.rank')))

        resources = []
        instances = list(query.all())

        instance = (instances.pop(0) if instances else None)
        for id in candidates:
            if instance:
                if instance.id == id:
                    resources.append(self._construct_resource(request, instance, data))
                    if instances:
                        instance = instances.pop(0)
                    else:
                        instance = None
                else:
                    resources.append(None)
            else:
                resources.append(None)

        response(resources)
开发者ID:esho,项目名称:spire,代码行数:34,代码来源:controllers.py


示例2: get_activity_query

def get_activity_query(user_id=None, session_id=None, test_id=None):
    # pylint: disable=no-member
    from .models import Activity, Comment, User

    _filter = functools.partial(_apply_filters, user_id=user_id, session_id=session_id, test_id=test_id)

    comments = select([
            literal_column("('comment:' || comment.id)").label('id'),
            literal_column(str(ACTION_COMMENTED)).label('action'),
            Comment.user_id.label('user_id'),
            Comment.session_id.label('session_id'),
            Comment.test_id.label('test_id'),
            Comment.timestamp.label('timestamp'),
            Comment.comment.label('text'),
            User.email.label('user_email'),
        ]).select_from(Comment.__table__.join(User, User.id == Comment.user_id))

    comments = _filter(Comment, comments)

    activity = select([
            literal_column("('activity:' || activity.id)").label('id'),
            Activity.action.label('action'),
            Activity.user_id.label('user_id'),
            Activity.session_id.label('session_id'),
            Activity.test_id.label('test_id'),
            Activity.timestamp.label('timestamp'),
            literal_column("NULL").label('text'),
            User.email.label('user_email'),
        ]).select_from(Activity.__table__.join(User, User.id == Activity.user_id))

    activity = _filter(Activity, activity)

    u = union_all(comments, activity).alias('u')

    return select([u]).order_by(u.c.timestamp)
开发者ID:Sentinel-One,项目名称:backslash,代码行数:35,代码来源:activity.py


示例3: createView

 def createView(self):
     # filter indexes
     catalog = self.env.catalog.index_catalog
     xmlindex_list = catalog.getIndexes(package_id='seismology',
                                        resourcetype_id='event')
     filter = ['datetime', 'latitude', 'longitude', 'depth',
               'magnitude', 'magnitude_type', 'event_type', 'np1_strike',
               'np1_dip', 'np1_rake', 'mt_mrr', 'mt_mtt', 'mt_mpp',
               'mt_mrt', 'mt_mrp', 'mt_mtp', 'localisation_method']
     xmlindex_list = [x for x in xmlindex_list if x.label in filter]
     if not xmlindex_list:
         return
     # build up query
     query, joins = catalog._createIndexView(xmlindex_list, compact=True)
     options = [
         sql.literal_column("datetime.keyval").label("end_datetime"),
         sql.literal_column("datetime.keyval").label("start_datetime"),
         sql.case(
             value=sql.literal_column("localisation_method.keyval"),
             whens={'manual': 'circle'},
             else_='square').label('gis_localisation_method'),
         sql.func.GeomFromText(
             sql.text("'POINT(' || longitude.keyval || ' ' || " + \
                      "latitude.keyval || ')', 4326")).label('geom')
     ]
     for option in options:
         query.append_column(option)
     query = query.select_from(joins)
     return util.compileStatement(query)
开发者ID:barsch,项目名称:seishub.plugins.exupery,代码行数:29,代码来源:seismic.py


示例4: messages_in_narrow_backend

def messages_in_narrow_backend(request, user_profile,
                               msg_ids = REQ(validator=check_list(check_int)),
                               narrow = REQ(converter=narrow_parameter)):
    # type: (HttpRequest, UserProfile, List[int], List[Dict[str, Any]]) -> HttpResponse

    # Note that this function will only work on messages the user
    # actually received

    # TODO: We assume that the narrow is a search.  For now this works because
    # the browser only ever calls this function for searches, since it can't
    # apply that narrow operator itself.

    query = select([column("message_id"), column("subject"), column("rendered_content")],
                   and_(column("user_profile_id") == literal(user_profile.id),
                        column("message_id").in_(msg_ids)),
                   join(table("zerver_usermessage"), table("zerver_message"),
                        literal_column("zerver_usermessage.message_id") ==
                        literal_column("zerver_message.id")))

    builder = NarrowBuilder(user_profile, column("message_id"))
    for term in narrow:
        query = builder.add_term(query, term)

    sa_conn = get_sqlalchemy_connection()
    query_result = list(sa_conn.execute(query).fetchall())

    search_fields = dict()
    for row in query_result:
        (message_id, subject, rendered_content, content_matches, subject_matches) = row
        search_fields[message_id] = get_search_fields(rendered_content, subject,
                                                      content_matches, subject_matches)

    return json_success({"messages": search_fields})
开发者ID:souravbadami,项目名称:zulip,代码行数:33,代码来源:messages.py


示例5: __init__

    def __init__(self, sess, unfiltered, filt_crit, tt, window_size=WINDOW_SIZE):
        self.sess = sess
        self.unfiltered = unfiltered
        self.filt_crit = filt_crit
        self.tt = tt
        self.window_size = window_size

        self.skipped = []

        # select-only, can't be used for updates
        self.filtered_s = filtered = select(unfiltered.c).where(filt_crit).alias("filtered")

        self.selectable = (
            select(
                [
                    filtered.c.size,
                    func.count().label("inode_count"),
                    func.max(filtered.c.has_updates).label("has_updates"),
                ]
            )
            .group_by(filtered.c.size)
            .having(and_(literal_column("inode_count") > 1, literal_column("has_updates") > 0))
        )

        # This is higher than selectable.first().size, in order to also clear
        # updates without commonality.
        self.upper_bound = self.sess.query(self.unfiltered.c.size).order_by(-self.unfiltered.c.size).limit(1).scalar()
开发者ID:cben,项目名称:bedup,代码行数:27,代码来源:tracking.py


示例6: get_query

def get_query(qtype = 'none', qobject = 'none'):

    if qtype != 'none' and qobject != 'none':

        # built queries for specified subset of patients
        query = db.session.query(label('sid', qobject.c.patient_sid),
                                 label('value_d', qobject.c.double_value),
                                 label('value_s', qobject.c.string_value),
                                 label('attribute', qobject.c.attribute_value))

    elif qtype == 'count' and qobject == 'none':

        # count of patients
        query = db.session.query(distinct(Clinical.patient_sid).label('sid'))


    else:

        # entire population
        query = db.session.query(distinct(Clinical.patient_sid).label('sid'),
                                 literal_column("'complement'").label('attribute'),
                                 literal_column("'0'").label('value_d'),
                                 literal_column("'null'").label('value_s'))


    db.session.commit()
    db.session.close()

    return query
开发者ID:GregSilverman,项目名称:cohort_rest_api,代码行数:29,代码来源:sqla_methods.py


示例7: read_many_byuser

    def read_many_byuser(self, request):
        """
        """

        username = request.matchdict['username']

        page = int(request.params.get("page", 1))
        pagesize = int(request.params.get("pagesize", 10))

        if self.Session.query(User).filter(User.username == username).first() == None:
            raise HTTPNotFound("Requested user does not exist.")

        items = []

        activities_sub_query = self.Session.query(Activity.activity_identifier.label("identifier"), Activity.version, Changeset.timestamp, Changeset.fk_user).\
            join(Changeset).\
            filter(or_(Activity.fk_status == 2, Activity.fk_status == 3)).subquery(name="sub_act")

        activities_query = self.Session.query(activities_sub_query, User.username).\
            join(User).filter(User.username == username).subquery(name="act")

        # All active and inactive stakeholders
        stakeholder_active = self.Session.query(Stakeholder).\
            filter(or_(Stakeholder.fk_status == 2, Stakeholder.fk_status == 3)).\
            subquery("st_active")

        # Get the five latest stakeholder by changeset
        stakeholder_sub_query = self.Session.query(stakeholder_active.c.stakeholder_identifier.label("identifier"), \
                                                   stakeholder_active.c.version, Changeset.timestamp, Changeset.fk_user).\
            join(Changeset, Changeset.id == stakeholder_active.c.fk_changeset).\
            subquery(name="sub_st")

        # Join the resulting set to the user table
        stakeholder_query = self.Session.query(stakeholder_sub_query, User.username).\
            join(User).filter(User.username == username).subquery(name="st")

        query = self.Session.query(activities_query, literal_column("\'activity\'").label("type")).\
            union(self.Session.query(stakeholder_query, literal_column("\'stakeholder\'").label("type"))).\
            order_by(desc(activities_query.c.timestamp)).order_by(desc(activities_query.c.version))

        for i in query.offset((page-1)*pagesize).limit(pagesize).all():
            items.append({
            "type": i.type,
            "author": i.username,
            "timestamp": i.timestamp,
            "version": i.version,
            "identifier": str(i.identifier)
            })                            
        return {
            "items": items,
            "username": username,
            "totalitems": query.count(),
            "pagesize": pagesize,
            "currentpage": page
        }

        return {}
开发者ID:sinnwerkstatt,项目名称:lokp,代码行数:57,代码来源:changeset_protocol.py


示例8: get_server_search_sources

def get_server_search_sources():
    return g.db.query(
        ExternalWFSSource.name.op('||')(literal_column("' ('")).op('||')(ExternalWFSSource.search_property).op('||')(literal_column("')'")).label('label'),
        literal_column("'wfs_'").op('||')
        (ExternalWFSSource.name).label('value')
    ).filter_by(active=True).union_all(g.db.query(
        GBIServer.title.label('label'),
        literal_column("'parcel_'").op('||')
        (ParcelSearchSource.id).label('value')
    ).filter(ParcelSearchSource.active==True).join(ParcelSearchSource.gbi_server)).all()
开发者ID:omniscale,项目名称:gbi-client,代码行数:10,代码来源:forms.py


示例9: test_sqlexpr

 def test_sqlexpr(self):
     m = MetaData()
     t = Table('t', m, Column(
         'x', Integer,
         server_default=literal_column('a') + literal_column('b'))
     )
     self.assert_compile(
         CreateTable(t),
         "CREATE TABLE t (x INTEGER DEFAULT a + b)"
     )
开发者ID:NaiRobley,项目名称:sqlalchemy,代码行数:10,代码来源:test_defaults.py


示例10: test_render_check_constraint_sqlexpr

 def test_render_check_constraint_sqlexpr(self):
     c = column("c")
     five = literal_column("5")
     ten = literal_column("10")
     eq_ignore_whitespace(
         autogenerate.render._render_check_constraint(
             CheckConstraint(and_(c > five, c < ten)), self.autogen_context
         ),
         "sa.CheckConstraint('c > 5 AND c < 10')",
     )
开发者ID:limodou,项目名称:uliweb-alembic,代码行数:10,代码来源:test_autogen_render.py


示例11: execute

 def execute(self, request, user, name):
     alliance = Alliance.load(name)
     if alliance is None:
         return HttpResponseRedirect(reverse("alliance_ranks"))
     
     ph = aliased(PlanetHistory)
     members = count().label("members")
     size = sum(ph.size).label("size")
     value = sum(ph.value).label("value")
     score = sum(ph.score).label("score")
     avg_size = size.op("/")(members).label("avg_size")
     avg_value = value.op("/")(members).label("avg_value")
     t10v = count(case(whens=((ph.value_rank <= 10 ,1),), else_=None)).label("t10v")
     t100v = count(case(whens=((ph.value_rank <= 100 ,1),), else_=None)).label("t100v")
     
     pho = aliased(PlanetHistory)
     sizeo = sum(pho.size).label("sizeo")
     valueo = sum(pho.value).label("valueo")
     scoreo = sum(pho.score).label("scoreo")
     
     Q = session.query(PlanetHistory.tick.label("tick"),
                       Alliance.id.label("id"),
                       literal_column("rank() OVER (PARTITION BY planet_history.tick ORDER BY sum(planet_history.size) DESC)").label("size_rank"),
                       literal_column("rank() OVER (PARTITION BY planet_history.tick ORDER BY sum(planet_history.value) DESC)").label("value_rank"),
                       )
     Q = Q.filter(PlanetHistory.active == True)
     Q = Q.join(PlanetHistory.current)
     Q = Q.join(Planet.intel)
     Q = Q.join(Intel.alliance)
     Q = Q.group_by(PlanetHistory.tick, Alliance.id)
     ranks = Q.subquery()
     
     Q = session.query(ph.tick, members,
                       size, value,
                       avg_size, avg_value,
                       size-sizeo, value-valueo, score-scoreo,
                       t10v, t100v,
                       )
     Q = Q.filter(ph.active == True)
     Q = Q.join(ph.current)
     Q = Q.join(Planet.intel)
     Q = Q.join(Intel.alliance)
     Q = Q.outerjoin((pho, and_(ph.id==pho.id, ph.tick-1==pho.tick),))
     Q = Q.filter(Intel.alliance == alliance)
     Q = Q.group_by(ph.tick)
     
     Q = Q.from_self().add_columns(ranks.c.size_rank, ranks.c.value_rank)
     Q = Q.outerjoin((ranks, and_(ph.tick == ranks.c.tick, alliance.id == ranks.c.id),))
     Q = Q.order_by(desc(ph.tick))
     
     history = Q.all()
     
     return render("ialliancehistory.tpl", request, alliance=alliance, members=alliance.intel_members, history=history)
开发者ID:Hardware-Hacks,项目名称:merlin,代码行数:53,代码来源:ialliancehistory.py


示例12: system_utilisation_counts_by_group

def system_utilisation_counts_by_group(grouping, systems):
    retval = defaultdict(lambda: dict((k, 0) for k in
            ['recipe', 'manual', 'idle_automated', 'idle_manual',
             'idle_broken', 'idle_removed']))
    query = systems.outerjoin(System.open_reservation)\
            .with_entities(grouping,
                func.coalesce(Reservation.type,
                func.concat('idle_', func.lower(System.status))),
                func.count(System.id))\
            .group_by(literal_column("1"), literal_column("2"))
    for group, state, count in query:
        retval[group][state] = count
    return retval
开发者ID:beaker-project,项目名称:beaker,代码行数:13,代码来源:utilisation.py


示例13: testlabels2

    def testlabels2(self):
        metadata = MetaData()
        table = Table("ImATable", metadata,
            Column("col1", Integer))
        x = select([table.c.col1.label("ImATable_col1")]).alias("SomeAlias")
        assert str(select([x.c.ImATable_col1])) == '''SELECT "SomeAlias"."ImATable_col1" \nFROM (SELECT "ImATable".col1 AS "ImATable_col1" \nFROM "ImATable") AS "SomeAlias"'''

        # note that 'foo' and 'FooCol' are literals already quoted
        x = select([sql.literal_column("'foo'").label("somelabel")], from_obj=[table]).alias("AnAlias")
        x = x.select()
        assert str(x) == '''SELECT "AnAlias".somelabel \nFROM (SELECT 'foo' AS somelabel \nFROM "ImATable") AS "AnAlias"'''

        x = select([sql.literal_column("'FooCol'").label("SomeLabel")], from_obj=[table])
        x = x.select()
        assert str(x) == '''SELECT "SomeLabel" \nFROM (SELECT 'FooCol' AS "SomeLabel" \nFROM "ImATable")'''
开发者ID:Frihet,项目名称:sqlalchemy-patches,代码行数:15,代码来源:quote.py


示例14: visit_select

    def visit_select(self, select, **kwargs):
        """Look for ``LIMIT`` and OFFSET in a select statement, and if
        so tries to wrap it in a subquery with ``row_number()`` criterion.

        """
        if not getattr(select, '_mssql_visit', None) and select._offset:
            # to use ROW_NUMBER(), an ORDER BY is required.
            orderby = self.process(select._order_by_clause)
            if not orderby:
                raise exc.InvalidRequestError('MSSQL requires an order_by when '
                                              'using an offset.')

            _offset = select._offset
            _limit = select._limit
            select._mssql_visit = True
            select = select.column(
                sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" \
                % orderby).label("mssql_rn")
                                   ).order_by(None).alias()

            limitselect = sql.select([c for c in select.c if
                                        c.key!='mssql_rn'])
            limitselect.append_whereclause("mssql_rn>%d" % _offset)
            if _limit is not None:
                limitselect.append_whereclause("mssql_rn<=%d" % 
                                            (_limit + _offset))
            return self.process(limitselect, iswrapper=True, **kwargs)
        else:
            return compiler.SQLCompiler.visit_select(self, select, **kwargs)
开发者ID:Amelandbor,项目名称:CouchPotato,代码行数:29,代码来源:base.py


示例15: adhoc_metric_to_sqla

    def adhoc_metric_to_sqla(self, metric, cols):
        """
        Turn an adhoc metric into a sqlalchemy column.

        :param dict metric: Adhoc metric definition
        :param dict cols: Columns for the current table
        :returns: The metric defined as a sqlalchemy column
        :rtype: sqlalchemy.sql.column
        """
        expression_type = metric.get('expressionType')
        db_engine_spec = self.database.db_engine_spec
        label = db_engine_spec.make_label_compatible(metric.get('label'))

        if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']:
            column_name = metric.get('column').get('column_name')
            sqla_column = column(column_name)
            table_column = cols.get(column_name)

            if table_column:
                sqla_column = table_column.get_sqla_col()

            sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column)
            sqla_metric = sqla_metric.label(label)
            return sqla_metric
        elif expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SQL']:
            sqla_metric = literal_column(metric.get('sqlExpression'))
            sqla_metric = sqla_metric.label(label)
            return sqla_metric
        else:
            return None
开发者ID:chenhaiyan,项目名称:incubator-superset,代码行数:30,代码来源:models.py


示例16: visit_select

    def visit_select(self, select, **kwargs):
        """Look for ``LIMIT`` and OFFSET in a select statement, and if
        so tries to wrap it in a subquery with ``row_number()`` criterion.
        """
        if self.dialect.has_window_funcs and (not getattr(select, '_mssql_visit', None)) and (select._limit is not None or select._offset is not None):
            # to use ROW_NUMBER(), an ORDER BY is required.
            orderby = self.process(select._order_by_clause)
            if not orderby:
                orderby = list(select.oid_column.proxies)[0]
                orderby = self.process(orderby)

            _offset = select._offset
            _limit = select._limit
            select._mssql_visit = True
            select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("mssql_rn")).order_by(None).alias()

            limitselect = sql.select([c for c in select.c if c.key!='mssql_rn'])
            if _offset is not None:
                limitselect.append_whereclause("mssql_rn>=%d" % _offset)
                if _limit is not None:
                    limitselect.append_whereclause("mssql_rn<=%d" % (_limit + _offset))
            else:
                limitselect.append_whereclause("mssql_rn<=%d" % _limit)
            return self.process(limitselect, iswrapper=True, **kwargs)
        else:
            return compiler.DefaultCompiler.visit_select(self, select, **kwargs)
开发者ID:Frihet,项目名称:sqlalchemy-patches,代码行数:26,代码来源:mssql.py


示例17: analytics_compability

def analytics_compability():
    user2set = dict([
        (user_id, map(operator.itemgetter(0), {
            "artist"            : db.session.query(Scrobble.artist).\
                                             group_by(Scrobble.artist),
            "track"             : db.session.query(func.concat(Scrobble.artist, literal_column('" – "'), Scrobble.track)).\
                                             group_by(Scrobble.artist, Scrobble.track),
        }
        [request.args.get("criterion")].\
        filter_by(user_id=user_id).\
        having(func.count(Scrobble.id) > int(request.args.get("more_than_x_scrobbles"))).\
        all()))
        for user_id in map(int, request.args.getlist("users"))
    ])

    user2username = dict(db.session.query(User.id, User.username).all())
    
    length2groups = [
        (length, filter(lambda (users, set): len(set) > 0, sorted([
            (
                ", ".join(sorted([user2username[i] for i in user2username if i in group], key=lambda username: username.lower())),
                reduce(set.intersection, map(set, [user2set[user_id] for user_id in group]))
            )
            for group in itertools.combinations(map(int, request.args.getlist("users")), length) if len(group) == length
        ], key=lambda (users, set): -len(set)))[:10])
        for length in range(2, len(user2username) + 1)
    ]
开发者ID:Erkan-Yilmaz,项目名称:last.fm.thelogin.ru,代码行数:27,代码来源:analytics.py


示例18: polymorphic_union

def polymorphic_union(table_map, typecolname, aliasname="p_union", cast_nulls=True):
    """Create a ``UNION`` statement used by a polymorphic mapper.

    See  :ref:`concrete_inheritance` for an example of how
    this is used.

    :param table_map: mapping of polymorphic identities to
     :class:`.Table` objects.
    :param typecolname: string name of a "discriminator" column, which will be
     derived from the query, producing the polymorphic identity for each row.  If
     ``None``, no polymorphic discriminator is generated.
    :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
     construct generated.
    :param cast_nulls: if True, non-existent columns, which are represented as labeled
     NULLs, will be passed into CAST.   This is a legacy behavior that is problematic
     on some backends such as Oracle - in which case it can be set to False.

    """

    colnames = util.OrderedSet()
    colnamemaps = {}
    types = {}
    for key in table_map.keys():
        table = table_map[key]

        # mysql doesnt like selecting from a select;
        # make it an alias of the select
        if isinstance(table, sql.Select):
            table = table.alias()
            table_map[key] = table

        m = {}
        for c in table.c:
            colnames.add(c.key)
            m[c.key] = c
            types[c.key] = c.type
        colnamemaps[table] = m

    def col(name, table):
        try:
            return colnamemaps[table][name]
        except KeyError:
            if cast_nulls:
                return sql.cast(sql.null(), types[name]).label(name)
            else:
                return sql.type_coerce(sql.null(), types[name]).label(name)

    result = []
    for type, table in table_map.iteritems():
        if typecolname is not None:
            result.append(
                sql.select(
                    [col(name, table) for name in colnames]
                    + [sql.literal_column(sql_util._quote_ddl_expr(type)).label(typecolname)],
                    from_obj=[table],
                )
            )
        else:
            result.append(sql.select([col(name, table) for name in colnames], from_obj=[table]))
    return sql.union_all(*result).alias(aliasname)
开发者ID:KonstantinStepanov,项目名称:flaskDb,代码行数:60,代码来源:util.py


示例19: sqla_col

 def sqla_col(self):
     name = self.column_name
     if not self.expression:
         col = column(self.column_name).label(name)
     else:
         col = literal_column(self.expression).label(name)
     return col
开发者ID:cderfdsa,项目名称:caravel,代码行数:7,代码来源:models.py


示例20: polymorphic_union

def polymorphic_union(table_map, typecolname, aliasname='p_union'):
    """create a UNION statement used by a polymorphic mapper.
    
    See the SQLAlchemy advanced mapping docs for an example of how this is used."""
    colnames = util.Set()
    colnamemaps = {}
    types = {}
    for key in table_map.keys():
        table = table_map[key]

        # mysql doesnt like selecting from a select; make it an alias of the select
        if isinstance(table, sql.Select):
            table = table.alias()
            table_map[key] = table

        m = {}
        for c in table.c:
            colnames.add(c.name)
            m[c.name] = c
            types[c.name] = c.type
        colnamemaps[table] = m

    def col(name, table):
        try:
            return colnamemaps[table][name]
        except KeyError:
            return sql.cast(sql.null(), types[name]).label(name)

    result = []
    for type, table in table_map.iteritems():
        if typecolname is not None:
            result.append(sql.select([col(name, table) for name in colnames] + [sql.literal_column("'%s'" % type).label(typecolname)], from_obj=[table]))
        else:
            result.append(sql.select([col(name, table) for name in colnames], from_obj=[table]))
    return sql.union_all(*result).alias(aliasname)
开发者ID:nakedible,项目名称:vpnease-l2tp,代码行数:35,代码来源:util.py



注:本文中的sqlalchemy.sql.literal_column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.not_函数代码示例发布时间:2022-05-27
下一篇:
Python sql.literal函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap