• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python expression.literal_column函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.expression.literal_column函数的典型用法代码示例。如果您正苦于以下问题:Python literal_column函数的具体用法?Python literal_column怎么用?Python literal_column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了literal_column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _run_query

def _run_query(start_date, end_date):
    created_filters = [DBInstance.created < end_date,
                       DBInstance.deleted == 0]
    created_columns = [DBInstance.created.label('timestamp'),
                       literal_column("0").label('deleted'),
                       DBDatastoreVersion.id.label('dsvid')]
    deleted_filters = [DBInstance.created < end_date,
                       DBInstance.deleted_at >= start_date,
                       DBInstance.deleted == 1]
    deleted_columns = [DBInstance.deleted_at.label('timestamp'),
                       literal_column("1").label('deleted'),
                       DBDatastoreVersion.id.label('dsvid')]

    query1 = DBInstance.query().\
        join(DBDatastoreVersion).\
        add_columns(*created_columns)
    query1 = query1.filter(*created_filters)

    query2 = DBInstance.query().\
        join(DBDatastoreVersion).\
        add_columns(*created_columns)
    query2 = query2.filter(*deleted_filters)

    query3 = DBInstance.query().\
        join(DBDatastoreVersion).\
        add_columns(*deleted_columns)
    query3 = query3.filter(*deleted_filters)

    union_query = query1.union(query2, query3).\
        order_by(text('anon_1.timestamp'))

    return union_query.all()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:32,代码来源:usage_report.py


示例2: visit_hierarchy

def visit_hierarchy(element, compiler, **kw):
    """visit compilation idiom for oracle"""
    if compiler.dialect.server_version_info < supported_db['oracle']:
        raise(HierarchyLesserError(compiler.dialect.name, 
                                   supported_db['oracle']))
    else:
        sel = element.select
        sel.append_column(literal_column('level', type_=Integer))
        sel.append_column(literal_column('CONNECT_BY_ISLEAF', 
                                         type_=Boolean).label('is_leaf'))
        sel.append_column(literal_column(
            "LTRIM(SYS_CONNECT_BY_PATH (%s,','),',')" % (element.child),
            type_=String).label('connect_path'))
        qry = "%s"  % (compiler.process(sel))
        if hasattr(element, 'starting_node') and \
           getattr(element, 'starting_node') is not False:
            if (element.starting_node == "a" and element.fk_type==String) or\
               (element.starting_node == "0" and element.fk_type==Integer):
                qry += " start with %s is null" % (element.parent)
            elif getattr(element, 'starting_node') is False:
                pass
            else:
                qry += " start with %s=%s" % (element.parent,
                                             element.starting_node)
        qry += " connect by prior %s=%s" % (element.child, element.parent)
        if kw.get('asfrom', False):
            qry = '(%s)' % qry
        return qry
开发者ID:FashtimeDotCom,项目名称:sqla_hierarchy,代码行数:28,代码来源:hierarchy.py


示例3: construct_search

 def construct_search(self, field_name, op=None):
     if op == '^':
         return literal_column(field_name).startswith
     elif op == '=':
         return literal_column(field_name).op('=')
     else:
         return literal_column(field_name).contains
开发者ID:ramin32,项目名称:Flask-SuperAdmin,代码行数:7,代码来源:view.py


示例4: quota_destroy_all_by_project

def quota_destroy_all_by_project(context, project_id):
    session = get_session()
    with session.begin():
        model_query(context, models.Quota, session=session,
                    read_deleted="no").\
                filter_by(project_id=project_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')},
                        synchronize_session=False)

        model_query(context, models.ProjectUserQuota, session=session,
                    read_deleted="no").\
                filter_by(project_id=project_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')},
                        synchronize_session=False)

        model_query(context, models.QuotaUsage,
                    session=session, read_deleted="no").\
                filter_by(project_id=project_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')},
                        synchronize_session=False)

        model_query(context, models.Reservation,
                    session=session, read_deleted="no").\
                filter_by(project_id=project_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')},
                        synchronize_session=False)
开发者ID:pombredanne,项目名称:manila,代码行数:34,代码来源:api.py


示例5: list_fleets

    def list_fleets(self, message, user, params):
        # Check the planet exists
        planet = Planet.load(*params.group(1,3,5))
        if planet is None:
            message.alert("No planet with coords %s:%s:%s" % params.group(1,3,5))
            return

        # Find all fleets with a known alliance who have defended this planet
        OQ = session.query(coalesce(FleetScan.launch_tick, FleetScan.landing_tick), literal_column("'From'").label("dir"), Planet.x, Planet.y, Planet.z, Alliance.name).select_from(FleetScan)
        OQ = OQ.filter(FleetScan.target_id == planet.id, FleetScan.in_galaxy==False, FleetScan.mission=="Defend")
        OQ = OQ.join(Intel, FleetScan.owner_id == Intel.planet_id).filter(Intel.alliance_id != None)
        OQ = OQ.join(Alliance, Intel.alliance_id == Alliance.id).join(Planet, FleetScan.owner_id == Planet.id)

        # Find all fleets with a known alliance who have been defended by this planet
        TQ = session.query(coalesce(FleetScan.launch_tick, FleetScan.landing_tick), literal_column("'To  '").label("dir"), Planet.x, Planet.y, Planet.z, Alliance.name).select_from(FleetScan)
        TQ = TQ.filter(FleetScan.owner_id == planet.id, FleetScan.in_galaxy==False, FleetScan.mission=="Defend")
        TQ = TQ.join(Intel, FleetScan.target_id == Intel.planet_id).filter(Intel.alliance_id != None)
        TQ = TQ.join(Alliance, Intel.alliance_id == Alliance.id).join(Planet, FleetScan.target_id == Planet.id)

        # Combine the results into one sorted list
        results = sorted(OQ.all()+TQ.all(), reverse=True)

        # Quit now if there are no results
        if len(results) == 0:
            message.reply("No suggestions found")
            return

        # Reply to the user
        message.reply("Tick  Dir   Planet     Alliance")
        limit = int(params.group(6) or 5)
        for r in results[:limit]:
            message.reply("%4s  %s  %-9s  %s" % (r[0], r[1], "%s:%s:%s" % (r[2], r[3], r[4]), r[5]))
        if len(results) > limit:
            message.reply("%s results not shown (%s total)" % (len(results)-limit, len(results)))
开发者ID:JDD,项目名称:merlin,代码行数:34,代码来源:guess.py


示例6: construct_search

 def construct_search(self, field_name, op=None):
     if op == "^":
         return literal_column(field_name).startswith
     elif op == "=":
         return literal_column(field_name).op("=")
     else:
         return literal_column(field_name).ilike
开发者ID:le-zeste,项目名称:Flask-SuperAdmin,代码行数:7,代码来源:view.py


示例7: soft_delete

 def soft_delete(self, synchronize_session="evaluate"):
     return self.update(
         {
             "deleted": literal_column("id"),
             "updated_at": literal_column("updated_at"),
             "deleted_at": timeutils.utcnow(),
         },
         synchronize_session=synchronize_session,
     )
开发者ID:Open-SFC,项目名称:nscs,代码行数:9,代码来源:session.py


示例8: initialize

    def initialize(cls, url, initialize_data=True, initialize_schema=True):
        """Initialize the database.

        This includes the schema, the materialized views, the custom
        functions, and the initial content.
        """
        if url in cls.engine_for_url:
            engine = cls.engine_for_url[url]
            return engine, engine.connect()

        engine = cls.engine(url)
        if initialize_schema:
            cls.initialize_schema(engine)
        connection = engine.connect()

        # Check if the recursive equivalents function exists already.
        query = select(
            [literal_column('proname')]
        ).select_from(
            table('pg_proc')
        ).where(
            literal_column('proname')=='fn_recursive_equivalents'
        )
        result = connection.execute(query)
        result = list(result)

        # If it doesn't, create it.
        if not result and initialize_data:
            resource_file = os.path.join(
                cls.resource_directory(), cls.RECURSIVE_EQUIVALENTS_FUNCTION
            )
            if not os.path.exists(resource_file):
                raise IOError("Could not load recursive equivalents function from %s: file does not exist." % resource_file)
            sql = open(resource_file).read()
            connection.execute(sql)

        if initialize_data:
            session = Session(connection)
            cls.initialize_data(session)

        if connection:
            connection.close()

        if initialize_schema and initialize_data:
            # Only cache the engine if all initialization has been performed.
            #
            # Some pieces of code (e.g. the script that runs
            # migrations) have a legitimate need to bypass some of the
            # initialization, but normal operation of the site
            # requires that everything be initialized.
            #
            # Until someone tells this method to initialize
            # everything, we can't short-circuit this method with a
            # cache.
            cls.engine_for_url[url] = engine
        return engine, engine.connect()
开发者ID:NYPL-Simplified,项目名称:server_core,代码行数:56,代码来源:__init__.py


示例9: actions_query

def actions_query(post_id):
    dbq = db.session
    comments = dbq.query(Comment.id, Comment.time_posted.label("time"),
                         literal_column("'Comment'", Unicode).label("t"))\
                  .filter(Comment.post_id==post_id)
    rels = dbq.query(Relation.id, Relation.time_linked.label("time"),
                     literal_column("'Relation'", Unicode).label("t"))\
              .filter(Relation.parent_id==post_id)
    q = comments.union(rels)
    return q
开发者ID:yedi,项目名称:openthink,代码行数:10,代码来源:db_queries.py


示例10: drop_old_duplicate_entries_from_table

def drop_old_duplicate_entries_from_table(migrate_engine, table_name, use_soft_delete, *uc_column_names):
    """Drop all old rows having the same values for columns in uc_columns.

    This method drop (or mark ad `deleted` if use_soft_delete is True) old
    duplicate rows form table with name `table_name`.

    :param migrate_engine:  Sqlalchemy engine
    :param table_name:      Table with duplicates
    :param use_soft_delete: If True - values will be marked as `deleted`,
                            if False - values will be removed from table
    :param uc_column_names: Unique constraint columns
    """
    meta = MetaData()
    meta.bind = migrate_engine

    table = Table(table_name, meta, autoload=True)
    columns_for_group_by = [table.c[name] for name in uc_column_names]

    columns_for_select = [func.max(table.c.id)]
    columns_for_select.extend(columns_for_group_by)

    duplicated_rows_select = select(
        columns_for_select, group_by=columns_for_group_by, having=func.count(table.c.id) > 1
    )

    for row in migrate_engine.execute(duplicated_rows_select):
        # NOTE(boris-42): Do not remove row that has the biggest ID.
        delete_condition = table.c.id != row[0]
        is_none = None  # workaround for pyflakes
        delete_condition &= table.c.deleted_at == is_none
        for name in uc_column_names:
            delete_condition &= table.c[name] == row[name]

        rows_to_delete_select = select([table.c.id]).where(delete_condition)
        for row in migrate_engine.execute(rows_to_delete_select).fetchall():
            LOG.info(
                _("Deleting duplicated row with id: %(id)s from table: " "%(table)s")
                % dict(id=row[0], table=table_name)
            )

        if use_soft_delete:
            delete_statement = (
                table.update()
                .where(delete_condition)
                .values(
                    {
                        "deleted": literal_column("id"),
                        "updated_at": literal_column("updated_at"),
                        "deleted_at": timeutils.utcnow(),
                    }
                )
            )
        else:
            delete_statement = table.delete().where(delete_condition)
        migrate_engine.execute(delete_statement)
开发者ID:koder-ua,项目名称:fuel-web,代码行数:55,代码来源:utils.py


示例11: create_stm

    def create_stm(self, columns=list(), filters=None, ordering=None, limit=None, start=None, url_filters=None,
                   associations=None):

        self.set_filters(filters)
        self.set_query_columns(columns)
        self.set_url_filters(url_filters)
        self.limit = limit
        self.start = start
        self.ordering = ordering

        stm = select(self.query_columns).select_from(self.table)

        filters = list()
        coordinates_filter = ""

        for condition in self.filters:
            if condition.get("op") == "coordinates":

                coordinates_filter = self.get_condition_square(
                    condition.get("lowerleft"),
                    condition.get("upperright"),
                    condition.get("property_ra"),
                    condition.get("property_dec"))

            else:
                filters.append(condition)

        base_filters = and_(*self.do_filter(self.table, filters))

        stm = stm.where(and_(base_filters, coordinates_filter))

        # Ordenacao
        if self.ordering is not None:
            asc = True
            property = self.ordering.lower()

            if self.ordering[0] == '-':
                asc = False
                property = self.ordering[1:].lower()

            if asc:
                stm = stm.order_by(property)
            else:
                stm = stm.order_by(desc(property))

        # Paginacao
        if self.limit:
            stm = stm.limit(literal_column(str(self.limit)))

            if self.start:
                stm = stm.offset(literal_column(str(self.start)))

        print(str(stm))

        return stm
开发者ID:linea-it,项目名称:dri,代码行数:55,代码来源:CatalogDB.py


示例12: plan_destroy

def plan_destroy(context, plan_id):
    session = get_session()
    now = timeutils.utcnow()
    with session.begin():
        model_query(context, models.Plan, session=session).\
            filter_by(id=plan_id).\
            update({'deleted': True,
                    'deleted_at': now,
                    'updated_at': literal_column('updated_at')})
        model_query(context, models.Resource, session=session).\
            filter_by(plan_id=plan_id).\
            update({'deleted': True,
                    'deleted_at': now,
                    'updated_at': literal_column('updated_at')})
开发者ID:WeAreFormalGroup,项目名称:smaug,代码行数:14,代码来源:api.py


示例13: migration_destroy

def migration_destroy(context, migration_id):
    session = get_session()
    now = timeutils.utcnow()
    with session.begin():
        model_query(context, models.Migration, session=session).\
            filter_by(id=migration_id).\
            update({'deleted': True,
                    'deleted_at': now,
                    'updated_at': literal_column('updated_at')})
        model_query(context, models.MigrationProperties, session=session).\
            filter_by(migration_id=migration_id).\
            update({'deleted': True,
                    'deleted_at': now,
                    'updated_at': literal_column('updated_at')})
开发者ID:aptira,项目名称:guts,代码行数:14,代码来源:api.py


示例14: report_json

def report_json(request):

    try:
        division_code = request.matchdict.get('divisioncode')
    except:
        raise HTTPBadRequest(detail='incorrect value for parameter '
                                    '"divisioncode"')

    try:
        resolution = float(request.params.get('resolution'))
    except:
        raise HTTPBadRequest(detail='invalid value for parameter "resolution"')

    hazard_type = request.matchdict.get('hazardtype', None)

    _filter = or_(AdministrativeDivision.code == division_code,
                  AdministrativeDivision.parent_code == division_code)

    simplify = func.ST_Simplify(
        func.ST_Transform(AdministrativeDivision.geom, 3857), resolution / 2)

    if hazard_type is not None:
        divisions = DBSession.query(AdministrativeDivision) \
            .add_columns(simplify, HazardLevel.mnemonic, HazardLevel.title) \
            .outerjoin(AdministrativeDivision.hazardcategories) \
            .join(HazardCategory) \
            .join(HazardType)\
            .join(HazardLevel) \
            .filter(and_(_filter, HazardType.mnemonic == hazard_type))
    else:
        divisions = DBSession.query(AdministrativeDivision) \
            .add_columns(simplify, literal_column("'None'"),
                         literal_column("'blah'")) \
            .filter(_filter)

    return [{
        'type': 'Feature',
        'geometry': to_shape(geom_simplified),
        'properties': {
            'name': division.name,
            'code': division.code,
            'url': request.route_url(
                'report' if hazard_type else 'report_overview',
                division=division,
                hazardtype=hazard_type),
            'hazardLevelMnemonic': hazardlevel_mnemonic,
            'hazardLevelTitle': hazardlevel_title
        }
    } for division, geom_simplified, hazardlevel_mnemonic,
          hazardlevel_title in divisions]
开发者ID:pgiraud,项目名称:thinkhazard,代码行数:50,代码来源:report.py


示例15: _wall_events_query

    def _wall_events_query(self):
        """WallMixin implementation."""
        public_event_types = [
            'group_created',
            'subject_created',
            'subject_modified',
            'page_created',
            'page_modified',
        ]
        from ututi.lib.wall import generic_events_query
        t_evt = meta.metadata.tables['events']
        evts_generic = generic_events_query()

        if hasattr(c, 'teacher'):
            user_id = c.teacher.id
        else:
            user_id = c.user_info.id

        query = evts_generic\
            .where(t_evt.c.author_id == user_id)

        # XXX using literal_column, this is because I don't know how
        # to refer to the column in the query directly
        query = query.where(or_(t_evt.c.event_type.in_(public_event_types),
                                and_(t_evt.c.event_type == 'file_uploaded',
                                     literal_column('context_ci.content_type') == 'subject')))

        return query
开发者ID:nous-consulting,项目名称:ututi,代码行数:28,代码来源:user.py


示例16: repl

 def repl(element):
     if isinstance(element, expression._BindParamClause):
         return expression.literal_column(_quote_ddl_expr(element.value))
     elif isinstance(element, expression.ColumnClause) and element.table is not None:
         return expression.column(element.name)
     else:
         return None
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:7,代码来源:util.py


示例17: load_balancer_update_state

def load_balancer_update_state(context, load_balancer_uuid, state):
    with context.session.begin():
        context.session.query(models.LoadBalancer).filter_by(
            uuid=load_balancer_uuid).update(
                {'state': state,
                 'updated_at': literal_column('updated_at')}
            )
开发者ID:ljjjustin,项目名称:nozzle,代码行数:7,代码来源:api.py


示例18: user_destroy

def user_destroy(user_id):
    db_session.query(models.User).\
            filter_by(id=user_id).\
            update({'deleted': True,
                    'deleted_at': timeutils.utcnow(),
                    'updated_at': literal_column('updated_at')})
    db_session.commit()
开发者ID:tpiperatgod,项目名称:turtle-web,代码行数:7,代码来源:dbapi.py


示例19: get_elevation

def get_elevation(fixes):
    shortener = int(max(1, len(fixes) / 1000))

    coordinates = [(fix[2]['longitude'], fix[2]['latitude']) for fix in fixes]
    points = MultiPoint(coordinates[::shortener])
    locations = from_shape(points, srid=4326).ST_DumpPoints()
    locations_id = extract_array_item(locations.path, 1)

    subq = db.session.query(locations_id.label('location_id'),
                            locations.geom.label('location')).subquery()

    elevation = Elevation.rast.ST_Value(subq.c.location)

    # Prepare main query
    q = db.session.query(literal_column('location_id'), elevation.label('elevation')) \
                  .filter(and_(subq.c.location.ST_Intersects(Elevation.rast),
                               elevation != None)).all()

    fixes_copy = [list(fix) for fix in fixes]

    for i in xrange(1, len(q)):
        prev = q[i - 1].location_id - 1
        current = q[i].location_id - 1

        for j in range(prev * shortener, current * shortener):
            elev = q[i - 1].elevation + (q[i].elevation - q[i - 1].elevation) * (j - prev * shortener)
            fixes_copy[j][11] = elev

    return fixes_copy
开发者ID:bbonamin,项目名称:Skylines,代码行数:29,代码来源:flightpath.py


示例20: volume_type_destroy

def volume_type_destroy(context, name):
    session = get_session()
    with session.begin():
        volume_type_ref = volume_type_get_by_name(context, name,
                                                  session=session)
        volume_type_id = volume_type_ref['id']
        session.query(models.VolumeTypes).\
                filter_by(id=volume_type_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')})
        session.query(models.VolumeTypeExtraSpecs).\
                filter_by(volume_type_id=volume_type_id).\
                update({'deleted': True,
                        'deleted_at': timeutils.utcnow(),
                        'updated_at': literal_column('updated_at')})
开发者ID:CiscoSystems,项目名称:cinder-old,代码行数:16,代码来源:api.py



注:本文中的sqlalchemy.sql.expression.literal_column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python expression.not_函数代码示例发布时间:2022-05-27
下一篇:
Python expression.literal函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap