• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python expression.cast函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.expression.cast函数的典型用法代码示例。如果您正苦于以下问题:Python cast函数的具体用法?Python cast怎么用?Python cast使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了cast函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: birth_time

    def birth_time(cls):
        hour = cast(func.extract("hour", cls.birth_datetime), String)
        minute = cast(func.extract("minute", cls.birth_datetime), String)

        hour = case([(func.length(hour) == 1, "0" + hour)], else_=hour)
        minute = case([(func.length(minute) == 1, "0" + minute)], else_=minute)
        return hour + ":" + minute
开发者ID:PEDSnet,项目名称:pedsnetcdm_to_pcornetcdm,代码行数:7,代码来源:demographics.py


示例2: _calc

    def _calc():
        stats = {}
        if day_from is not None and day_to is not None:
            # fill in gaps
            for day in util.daterange(day_from,day_to):
                stats[day] = dict(total=0,done=0,help=0)

        # TODO: do the work in the database.
        q = session.query(cast(Article.pubdate,Date), Article)

        if day_from is not None:
            q = q.filter(cast(Article.pubdate, Date) >= day_from)
        if day_to is not None:
            q = q.filter(cast(Article.pubdate, Date) <= day_to)

        for day,art in q:
            if day not in stats:
                stats[day] = dict(total=0,done=0,help=0)
            stats[day]['total'] += 1
            if not art.needs_sourcing:
                stats[day]['done'] += 1

        stats = sorted([(day,row) for day,row in stats.iteritems()], key=lambda x: x[0], reverse=True )

        return [DailyStats(x[0], x[1]['total'], x[1]['done']) for x in stats]
开发者ID:Anghelmanuel,项目名称:unsourced,代码行数:25,代码来源:front.py


示例3: get

    def get(self, country_id):
        """
        Gather specified country from the database with its data
        country_id a non-zero, positive int
        return a json object representing the country
        """
        session = db.loadSession()

        assert type(country_id) == int

        # Make the sql query
        result = session.query(
            # What to select
            # outerjoin defaults to a LEFT outer join, NOT full outer join
            db.Country.id,
            db.Country.name,
            func.array_agg_cust(array([cast(db.Olympics.id, String), cast(db.Olympics.year, String), db.Olympics.season, db.City.name]))
            )\
            .select_from(db.Country)\
            .outerjoin(db.City)\
            .outerjoin(db.Olympics)\
            .filter(
                # What to filter by (where clause)
                db.Country.id==country_id)\
            .group_by(db.Country.id,
            db.Country.name)\
            .first() # Actually executes the query and returns a tuple
        
        session.close()
        
        keys = ('id', 'name', ('olympics-hosted', ('id', 'year', 'season', 'city')))

        country_dict = add_keys(keys, result)

        return jsonify(country_dict)
开发者ID:alsukurr,项目名称:cs373-idb,代码行数:35,代码来源:api.py


示例4: selectables

    def selectables(cls, bag, agg_spec):
        """ Create a list of statements from spec

        :type bag: mongosql.bag.ModelPropertyBags
        :rtype: list[sqlalchemy.sql.elements.ColumnElement]
        """
        # TODO: calculation expressions for selection: http://docs.mongodb.org/manual/meta/aggregation-quick-reference/
        selectables = []
        for comp_field, comp_expression in agg_spec.items():
            # Column reference
            if isinstance(comp_expression, basestring):
                selectables.append(bag.columns[comp_expression].label(comp_field))
                continue

            # Computed expression
            assert isinstance(comp_expression, dict), 'Aggregate: Expression should be either a column name, or an object'
            assert len(comp_expression) == 1, 'Aggregate: expression can only contain a single operator'
            operator, expression = comp_expression.popitem()

            # Expression statement
            if isinstance(expression, int) and operator == '$sum':
                # Special case for count
                expression_stmt = expression
            elif isinstance(expression, basestring):
                # Column name
                expression_stmt = bag.columns[expression]
                # Json column?
                if bag.columns.is_column_json(expression):
                    # PostgreSQL always returns text values from it, and for aggregation we usually need numbers :)
                    expression_stmt = cast(expression_stmt, Float)
            elif isinstance(expression, dict):
                # Boolean expression
                expression_stmt = MongoCriteria.statement(bag, expression)
                # Need to cast it to int
                expression_stmt = cast(expression_stmt, Integer)
            else:
                raise AssertionError('Aggregate: expression should be either a column name, or an object')

            # Operator
            if operator == '$max':
                comp_stmt = func.max(expression_stmt)
            elif operator == '$min':
                comp_stmt = func.min(expression_stmt)
            elif operator == '$avg':
                comp_stmt = func.avg(expression_stmt)
            elif operator == '$sum':
                if isinstance(expression_stmt, int):
                    # Special case for count
                    comp_stmt = func.count()
                    if expression_stmt != 1:
                        comp_stmt *= expression_stmt
                else:
                    comp_stmt = func.sum(expression_stmt)
            else:
                raise AssertionError('Aggregate: unsupported operator "{}"'.format(operator))

            # Append
            selectables.append(comp_stmt.label(comp_field))

        return selectables
开发者ID:dignio,项目名称:py-mongosql,代码行数:60,代码来源:statements.py


示例5: filtering

    def filtering(self):
        search_value = self.request_values.get('sSearch')
        condition = None


        def search(idx, col):
            tmp_column_name = col.column_name.split('.')
            for tmp_name in tmp_column_name:
                if tmp_column_name.index(tmp_name) == 0:
                    obj = getattr(self.sqla_object, tmp_name)
                    parent = self.sqla_object
                elif isinstance(obj.property, RelationshipProperty):
                    parent = obj.property.mapper.class_
                    obj = getattr(parent, tmp_name)
                if not hasattr(obj, 'property'):
                    sqla_obj = parent
                    column_name = tmp_name
                elif isinstance(obj.property, RelationshipProperty):
                    sqla_obj = obj.mapper.class_
                    column_name = tmp_name
                    if not column_name:
                        column_name = obj.property.table.primary_key.columns \
                            .values()[0].name
                else:
                    sqla_obj = parent
                    column_name = tmp_name
            return sqla_obj, column_name
        if search_value:
            search_value_list = str(search_value).split()
            for search_val in search_value_list:
                conditions = []
                for idx, col in enumerate(self.columns):
                    if self.request_values.get('bSearchable_%s' % idx) in (
                            True, 'true') and col.searchable:
                        sqla_obj, column_name = search(idx, col)
                        conditions.append(
                            cast(get_attr(sqla_obj, column_name), String).ilike('%%%s%%' % search_val))
                condition = or_(*conditions)
                if condition is not None:
                    self.query = self.query.filter(condition)
        conditions = []
        for idx, col in enumerate(self.columns):
            search_value2 = self.request_values.get('sSearch_%s' % idx)
            if search_value2:
                sqla_obj, column_name = search(idx, col)
                if col.search_like:
                    conditions.append(
                        cast(get_attr(sqla_obj, column_name), String).ilike('%%%s%%' % search_value2))
                else:
                    conditions.append(
                        cast(get_attr(sqla_obj, column_name), String).__eq__(search_value2))
                if condition is not None:
                    condition = and_(condition, and_(*conditions))
                else:
                    condition = and_(*conditions)
        if condition is not None:
            self.query = self.query.filter(condition)
            self.cardinality_filtered = self.query.count()
        else:
            self.cardinality_filtered = self.cardinality
开发者ID:alexbelich,项目名称:PromTal,代码行数:60,代码来源:datatables.py


示例6: get_clustered_locations

    def get_clustered_locations(location_column,
                                threshold_radius=1000, filter=None):
        """
        SELECT ST_Centroid(
            (ST_Dump(
                ST_Union(
                    ST_Buffer(
                        takeoff_location_wkt::geography, 1000
                    )::geometry
                )
            )
        ).geom) FROM flights WHERE pilot_id=31;
        """

        # Cast the takeoff_location_wkt column to Geography
        geography = cast(location_column, Geography)

        # Add a metric buffer zone around the locations
        buffer = cast(geography.ST_Buffer(threshold_radius), Geometry)

        # Join the locations into one MultiPolygon
        union = buffer.ST_Union()

        # Split the MultiPolygon into separate polygons
        dump = union.ST_Dump().geom

        # Calculate center points of each polygon
        locations = func.ST_Centroid(dump)

        query = db.session.query(locations.label('location'))

        if filter is not None:
            query = query.filter(filter)

        return [Location.from_wkb(row.location) for row in query]
开发者ID:TobiasLohner,项目名称:SkyLines,代码行数:35,代码来源:geo.py


示例7: lookup

    def lookup(self):
        q = request.params["q"]
        # Assume that the SQL library handles the SQL attack vectors

        person_query = meta.Session.query(
            Person.id, sa.func.concat(Person.fullname, " - ", Person.email_address).label("pretty")
        ).filter(
            sa.or_(Person.lastname.ilike(q + "%"), Person.fullname.ilike(q + "%"), Person.email_address.ilike(q + "%"))
        )

        personid_query = meta.Session.query(Person.id, cast(Person.id, sa.String).label("pretty")).filter(
            cast(Person.id, sa.String).like(q + "%")
        )

        boarding_query = meta.Session.query(FulfilmentGroup.person_id, FulfilmentGroup.code.label("pretty")).filter(
            FulfilmentGroup.code.ilike(q + "%")
        )

        badge_query = meta.Session.query(Fulfilment.person_id, Fulfilment.code.label("pretty")).filter(
            Fulfilment.code.ilike(q + "%")
        )

        union_query = person_query.union(personid_query, boarding_query, badge_query).order_by("pretty").limit(5)

        return dict(r=list(union_query.all()))
开发者ID:pfctdayelise,项目名称:zookeepr,代码行数:25,代码来源:checkin.py


示例8: get_info

 def get_info(cls, location):
     '''Returns a query object of mountain waves around the location'''
     return DBSession.query(cls) \
         .filter(func.ST_DWithin(
             cast(WKTElement(location.to_wkt(), srid=4326), Geography),
             cast(cls.location, Geography),
             5000))
开发者ID:dkm,项目名称:skylines,代码行数:7,代码来源:mountain_wave_project.py


示例9: by_location

 def by_location(cls, location):
     '''Returns a query object of mountain waves around the location'''
     return cls.query() \
         .filter(db.func.ST_DWithin(
             cast(location.make_point(), Geography),
             cast(cls.location, Geography),
             5000))
开发者ID:imclab,项目名称:skylines,代码行数:7,代码来源:mountain_wave_project.py


示例10: demographic_etl

def demographic_etl(config):
    # set up
    connection = get_connection(config)
    pedsnet_session = init_pedsnet(connection)
    init_pcornet(connection)

    # multiple aliases for pedsnet_pcornet_valueset_map
    # to allow the three named joins
    gender_value_map = aliased(ValueSetMap)
    ethnicity_value_map = aliased(ValueSetMap)
    race_value_map = aliased(ValueSetMap)

    # extract the data from the person table
    person = pedsnet_session.query(Person.person_id,
                                   Person.birth_date,
                                   Person.birth_time,
                                   coalesce(gender_value_map.target_concept, 'OT'),
                                   coalesce(ethnicity_value_map.target_concept, 'OT'),
                                   coalesce(race_value_map.target_concept, 'OT'),
                                   bindparam("biobank_flag", "N"),
                                   Person.gender_source_value,
                                   Person.ethnicity_source_value,
                                   Person.race_source_value,
                                   Person.site,
                                   bindparam("gender_identity", None),
                                   bindparam("raw_gender_identity", None),
                                   bindparam("sexual_orientation", None),
                                   bindparam("raw_sexual_orientation", None)
                                   ). \
        outerjoin(gender_value_map,
                  and_(gender_value_map.source_concept_class == 'Gender',
                       case([(and_(Person.gender_concept_id == None,
                                   gender_value_map.source_concept_id == None), True)],
                            else_=cast(Person.gender_concept_id, String(200)) ==
                                  gender_value_map.source_concept_id))). \
        outerjoin(ethnicity_value_map,
                  and_(ethnicity_value_map.source_concept_class == 'Hispanic',
                       case([(and_(Person.ethnicity_concept_id == None,
                                   ethnicity_value_map.source_concept_id == None), True)],
                            else_=cast(Person.ethnicity_concept_id, String(200)) ==
                                  ethnicity_value_map.source_concept_id))). \
        outerjoin(race_value_map,
                  and_(race_value_map.source_concept_class == 'Race',
                       case([(and_(Person.race_concept_id == None,
                                   race_value_map.source_concept_id == None), True)],
                            else_=cast(Person.race_concept_id, String(200)) ==
                                  race_value_map.source_concept_id))).all()

    # transform data to pcornet names and types
    # load to demographic table
    odo(person, Demographic.__table__,
        dshape='var * {patid: string, birth_date: date, birth_time: string, sex: string,'
               'hispanic: string, race: string, biobank_flag: string, raw_sex: string,'
               'raw_hispanic: string, raw_race:string, site: string, gender_identity: string,'
               'raw_gender_identity: string, sexual_orientation: string, raw_sexual_orientation: string}'
        )
    # close session

    pedsnet_session.close()
开发者ID:PEDSnet,项目名称:pedsnetcdm_to_pcornetcdm,代码行数:59,代码来源:demographicsETL.py


示例11: get_proxy_address

    def get_proxy_address(
        self,
        user_id,
        ip_address=None,
        best=4,
        conn_factor=0.2
    ):
        """Get a usable proxy address for audio resource of user by user_id.
        If there is no available server, None will be returned

        We sort the connection by

            user_rate - (have_conn*conn_factor) then
            res_rate - (have_conn*conn_factor)

        Which means, less user proxy will be selected first, also, if there
        is already a proxy connection there, they will have higher priority
        (introduced by the conn_factor).

        """
        from sqlalchemy.sql.expression import or_, and_, cast, case
        from sqlalchemy.types import Float

        Port = tables.Port
        Proxy = tables.Proxy
        ProxyConnection = tables.ProxyConnection

        # calculate the connection factor
        factor_case = case([
            (ProxyConnection.server_id, conn_factor)
        ], else_=0)

        # Cast the type to make sure the result will be float
        res_rate = (Proxy.resource_count / cast(Proxy.resource_limit, Float))
        res_rate -= factor_case

        user_rate = (Proxy.user_count / cast(Proxy.user_limit, Float))
        user_rate -= factor_case

        query = self.session \
            .query(Port) \
            .join((Proxy, Proxy.id == Port.server_id)) \
            .outerjoin((ProxyConnection,
                        and_(ProxyConnection.server_id == Proxy.id,
                             ProxyConnection.user_id == user_id))) \
            .order_by(user_rate) \
            .order_by(res_rate) \
            .filter(or_(Proxy.user_count < Proxy.user_limit,
                        Proxy.user_limit == 0)) \
            .filter(Proxy.alive) \
            .filter(Proxy.active) \
            .filter(Port.name == 'web')

        # find a random proxy
        ports = query.limit(best).all()
        if not ports:
            return None
        port = random.choice(ports)
        return port.address
开发者ID:scottietie,项目名称:nowin_core,代码行数:59,代码来源:radio.py


示例12: filtering

	def filtering(self):
		"""Construct the query, by adding filtering(LIKE) on all columns when the datatable's search box is used
		"""

		def resolve_column(column):
			tmp_name = column.data.split('.')
			obj = getattr(self.sqla_object, tmp_name[0], None)
			if obj is None:
				raise DataTablesException('Invalid column data: ' + tmp_name[0])
			if not hasattr(obj, "property"): # Ex: hybrid_property or property
				sqla_obj = self.sqla_object
				column_name = "".join(tmp_name[1:])
			elif isinstance(obj.property, RelationshipProperty): # Ex: ForeignKey
		 		# Ex: address.description
				sqla_obj = obj.mapper.class_
				column_name = "".join(tmp_name[1:])
				if not column_name:
					# Find first primary key
					column_name = obj.property.table.primary_key.columns.values()[0].name
			else: #-> ColumnProperty
				sqla_obj = self.sqla_object
				column_name = column.data
			return sqla_obj, column_name

		condition = None

		search_value = self.request_values.get('search[value]')
		if search_value != "":
			conditions = []
			for column in self.columns:
				# ignore null columns (javascript placeholder) or unsearchable
				if column.data != "" and column.searchable:
					sqla_obj, column_name = resolve_column(column)
					conditions.append(cast(get_attr(sqla_obj, column_name), String).ilike('%%%s%%' % search_value))
			condition = or_(*conditions)

		conditions = []
		for column in self.columns:
			# ignore null columns (javascript placeholder) or unsearchable
			if column.data != "" and column.search_value != "" and column.searchable:
				sqla_obj, column_name = resolve_column(column)

				#if col.search_like:
				#	conditions.append(cast(get_attr(sqla_obj, column_name), String).like(col.search_like % search_value2))
				#else:
				#	conditions.append(cast(get_attr(sqla_obj, column_name), String).__eq__(search_value2))
				conditions.append(cast(get_attr(sqla_obj, column_name), String).__eq__(column.search_value))

				if condition is not None:
					condition = and_(condition, and_(*conditions))
				else:
					condition= and_(*conditions)

		if condition is not None:
			self.query = self.query.filter(condition)
			# count after filtering
			self.cardinality_filtered = self.query.count()
		else:
			self.cardinality_filtered = self.cardinality
开发者ID:dacb,项目名称:viscount,代码行数:59,代码来源:datatables.py


示例13: by_location

    def by_location(cls, location):
        """Returns a query object of mountain waves around the location"""
        if not isinstance(location, Location):
            raise TypeError('Invalid `location` parameter.')

        return cls.query() \
            .filter(db.func.ST_DWithin(
                cast(location.make_point(), Geography),
                cast(cls.location, Geography),
                5000))
开发者ID:Turbo87,项目名称:skylines,代码行数:10,代码来源:mountain_wave_project.py


示例14: refine_with_user_area

def refine_with_user_area(query):
    """Takes a query and refines it with a spatial constraint
    based on user setting"""
    if 'lon' and 'lat' and 'radius' in session:
        return query.filter(ST_DWithin(
            cast(Task.location, Geography),
            cast(from_shape(Point(session["lon"], session["lat"])), Geography),
            session["radius"]))
    else:
        return query
开发者ID:emacsen,项目名称:maproulette,代码行数:10,代码来源:helpers.py


示例15: search

    def search(self, keywords):
        criteria = []

        for keyword in keywords_split(keywords):
            if keyword:
                keyword = '%{0}%'.format(keyword)
                criteria.append(cast(Article.title, Unicode).ilike(keyword))
                criteria.append(cast(Article.keywords, Unicode).ilike(keyword))
                
        return self.public().filter(db.or_(*criteria))
开发者ID:bingjin,项目名称:codingpy,代码行数:10,代码来源:models.py


示例16: filtering

    def filtering(self):
        """Construct the query, by adding filtering(LIKE) on all columns when the datatable's search box is used
        """
        search_value = self.request_values.get('sSearch')
        condition = None
        def search(idx, col):
            tmp_column_name = col.column_name.split('.')
            obj = getattr(self.sqla_object, tmp_column_name[0])
            if not hasattr(obj, "property"): # Ex: hybrid_property or property
                sqla_obj = self.sqla_object
                column_name = col.column_name
            elif isinstance(obj.property, RelationshipProperty): #Ex: ForeignKey
                # Ex: address.description
                sqla_obj = obj.mapper.class_
                column_name = "".join(tmp_column_name[1:])
                if not column_name:
                    # find first primary key
                    column_name = obj.property.table.primary_key.columns \
                        .values()[0].name
            else:
                sqla_obj = self.sqla_object
                column_name = col.column_name
            return sqla_obj, column_name

        if search_value:
            conditions = []
            for idx, col in enumerate(self.columns):
                if self.request_values.get('bSearchable_%s' % idx) in (
                        True, 'true'):
                    sqla_obj, column_name = search(idx, col)
                    conditions.append(cast(get_attr(sqla_obj, column_name), String).ilike('%%%s%%' % search_value))
            condition = or_(*conditions)
        conditions = []
        for idx, col in enumerate(self.columns):
            if self.request_values.get('sSearch_%s' % idx) in (True, 'true'):
                search_value2 = self.request_values.get('sSearch_%s' % idx)
                sqla_obj, column_name = search(idx, col)
                
                if col.search_like:
                    conditions.append(cast(get_attr(sqla_obj, column_name), String).like(col.search_like % search_value2))
                else:
                    conditions.append(cast(get_attr(sqla_obj, column_name), String).__eq__(search_value2))

                if condition is not None:
                    condition = and_(condition, and_(*conditions))
                else:
                    condition= and_(*conditions)

        if condition is not None:
            self.query = self.query.filter(condition)
            # count after filtering
            self.cardinality_filtered = self.query.count()
        else:
            self.cardinality_filtered = self.cardinality
开发者ID:tanyewei,项目名称:sqlalchemy-datatables,代码行数:54,代码来源:datatables.py


示例17: preprocess_value_and_column

    def preprocess_value_and_column(cls, column, value):
        value_array = is_array(value)

        # Coerce operand
        if column.is_array and value_array:
            value = cast(pg.array(value), pg.ARRAY(column.sql_col.type.item_type))
        if column.is_json:
            coerce_type = column.sql_col.type.coerce_compared_value('=', value)  # HACKY: use sqlalchemy type coercion
            column.sql_col = cast(column.sql_col, coerce_type)

        return column, value
开发者ID:kolypto,项目名称:py-mongosql,代码行数:11,代码来源:statements.py


示例18: birth_date

    def birth_date(cls):
        year = cast(cls.year_of_birth, String)
        month = cast(cls.month_of_birth, String)
        day = cast(cls.day_of_birth, String)

        month = case([(month == "", "01")],
                     else_=case([(func.length(month) == 1, "0" + month)], else_=month))
        day = case([(day == "", "01")],
                   else_=case([(func.length(day) == 1, "0" + day)], else_=day))

        return year + "-" + month + "-" + day
开发者ID:PEDSnet,项目名称:pedsnetcdm_to_pcornetcdm,代码行数:11,代码来源:demographics.py


示例19: all

    def all(self, *args, **kwargs):
        (_u, _p, _e, _l) = [getattr(self.db, options)._table for options in
                ('userfeatures', 'phonefunckey', 'extenumbers', 'linefeatures')]

        conds = [
                _l.c.iduserfeatures == _p.c.iduserfeatures,
                _p.c.typeextenumbers != None,
                _p.c.typevalextenumbers != None,
                _p.c.supervision == 1,
                _p.c.progfunckey == 1,
                cast(_p.c.typeextenumbers, VARCHAR(255)) == cast(_e.c.type, VARCHAR(255)),
                _p.c.typevalextenumbers != 'user',
                _p.c.typevalextenumbers == _e.c.typeval
        ]
        if 'context' in kwargs:
            conds.append(_l.c.context == kwargs['context'])

        q = select(
            [_p.c.iduserfeatures, _p.c.exten, _p.c.typeextenumbers,
             _p.c.typevalextenumbers, _p.c.typeextenumbersright,
             _p.c.typevalextenumbersright, _e.c.exten.label('leftexten')],

            and_(*conds)
        )

        """
        _l2 = alias(_l)

        conds = [
                _l.c.iduserfeatures      == _p.c.iduserfeatures, 
                _p.c.typeextenumbers     != None,
                _p.c.typevalextenumbers  != None,
                _p.c.supervision         == 1, 
                _p.c.progfunckey         == 1,
                cast(_p.c.typeextenumbers,VARCHAR(255)) == cast(_e.c.type,VARCHAR(255)),
                _p.c.typevalextenumbers  == 'user',
                _p.c.typevalextenumbers  == cast(_l2.c.iduserfeatures,VARCHAR(255)),
                _e.c.typeval             == cast(_l2.c.id,VARCHAR(255))
        ]
        if 'context' in kwargs:
            conds.append(_l.c.context == kwargs['context'])

        q2 = select(
            [_p.c.iduserfeatures, _p.c.exten, _p.c.typeextenumbers,
             _p.c.typevalextenumbers, _p.c.typeextenumbersright,
             _p.c.typevalextenumbersright, _e.c.exten.label('leftexten')],

            and_(*conds)
        )

        return self.execute(q1.union(q2)).fetchall()
        """
        return self.execute(q).fetchall()
开发者ID:Eyepea,项目名称:xivo-confgen,代码行数:53,代码来源:xivo_db.py


示例20: upgrade_severity_levels

def upgrade_severity_levels(session, severity_map):
    """
    Updates the potentially changed severities at the reports.
    """
    LOG.debug("Upgrading severity levels started...")

    # Create a sql query from the severity map.
    severity_map_q = union_all(*[
        select([cast(bindparam('checker_id' + str(i), str(checker_id))
                .label('checker_id'), sqlalchemy.String),
                cast(bindparam('severity' + str(i), Severity._NAMES_TO_VALUES[
                    severity_map[checker_id]])
               .label('severity'), sqlalchemy.Integer)])
        for i, checker_id in enumerate(severity_map)]) \
        .alias('new_severities')

    checker_ids = severity_map.keys()

    # Get checkers which has been changed.
    changed_checker_q = select([Report.checker_id, Report.severity]) \
        .group_by(Report.checker_id, Report.severity) \
        .where(Report.checker_id.in_(checker_ids)) \
        .except_(session.query(severity_map_q)).alias('changed_severites')

    changed_checkers = session.query(changed_checker_q.c.checker_id,
                                     changed_checker_q.c.severity)

    # Update severity levels of checkers.
    if changed_checkers:
        updated_checker_ids = set()
        for checker_id, severity_old in changed_checkers:
            severity_new = severity_map.get(checker_id, 'UNSPECIFIED')
            severity_id = Severity._NAMES_TO_VALUES[severity_new]

            LOG.info("Upgrading severity level of '%s' checker from %s to %s",
                     checker_id,
                     Severity._VALUES_TO_NAMES[severity_old],
                     severity_new)

            if checker_id in updated_checker_ids:
                continue

            session.query(Report) \
                .filter(Report.checker_id == checker_id) \
                .update({Report.severity: severity_id})

            updated_checker_ids.add(checker_id)

        session.commit()

    LOG.debug("Upgrading of severity levels finished...")
开发者ID:Ericsson,项目名称:codechecker,代码行数:51,代码来源:db_cleanup.py



注:本文中的sqlalchemy.sql.expression.cast函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python expression.column函数代码示例发布时间:2022-05-27
下一篇:
Python expression.case函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap