• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.cast函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.cast函数的典型用法代码示例。如果您正苦于以下问题:Python cast函数的具体用法?Python cast怎么用?Python cast使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了cast函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: weekList

def weekList():
    if not redis.llen('rank:week'):
        rows = db_session.query(
            User.id,
            User.username,
            label('number', func.count(Investment.amount)),
            label('total_amount', func.sum(Investment.amount))
        ).filter(
            Investment.user_id == User.id,
            cast(Investment.added_at, Date) <= datetime.datetime.today(),
            cast(Investment.added_at, Date) >= datetime.datetime.today() -
            datetime.timedelta(weeks=1)
        ).group_by(User.id).order_by(
            func.sum(Investment.amount).desc()
        ).limit(15).all()

        rank_list = []

        for i in rows:
            i = dict(zip(i.keys(), i))
            data = {
                'id': i['id'],
                'username': i['username'],
                'total_amount': float(i['total_amount']),
                'number': i['number']
            }
            rank_list.append(data)
            redis.rpush('rank:week', json.dumps(data))
        redis.expire('rank:week', 3600)
    else:
        rank_list = [json.loads(i.decode()) for i in redis.lrange('rank:week', 0, redis.llen('rank:week'))]

    return rank_list
开发者ID:xxguo,项目名称:leopard,代码行数:33,代码来源:rank.py


示例2: apply

    def apply(self, query):
        if self.op == self.default_op and self.value1 is None:
            return query

        if self.op in (ops.between, ops.not_between):
            left = min(self.value1, self.value2)
            right = max(self.value1, self.value2)
            cond = self.sa_col.between(sa.cast(left, sa.Time), sa.cast(right, sa.Time))
            if self.op == ops.not_between:
                cond = ~cond
            return query.filter(cond)

        # Casting this because some SQLAlchemy dialects (MSSQL) convert the value to datetime
        # before binding.
        val = sa.cast(self.value1, sa.Time)

        if self.op == ops.eq:
            query = query.filter(self.sa_col == val)
        elif self.op == ops.not_eq:
            query = query.filter(self.sa_col != val)
        elif self.op == ops.less_than_equal:
            query = query.filter(self.sa_col <= val)
        elif self.op == ops.greater_than_equal:
            query = query.filter(self.sa_col >= val)
        else:
            query = super(TimeFilter, self).apply(query)
        return query
开发者ID:level12,项目名称:webgrid,代码行数:27,代码来源:filters.py


示例3: add_bucket_field_to_query

    def add_bucket_field_to_query(self, q, q_entities, field, field_entity):
        # Get min, max if not provided.
        field_min = 0
        field_max = 0

        if not field.has_key("min") or not field.has_key("max"):
            field_min, field_max = self.get_field_min_max(field)

            # Override calculated min/max if values were provided.
        if field.has_key("min"):
            field_min = field.get("min")
        if field.has_key("max"):
            field_max = field.get("max")

        num_buckets = field.get("num_buckets", 10)

        # Get bucket width.
        bucket_width = (field_max - field_min) / num_buckets

        # Get bucket field entities.
        # Bit of a trick here: we use field_max - bucket_width because normally last bucket gets all values >= field_max.
        # Here we use one less bucket, and then filter.  This essentially makes the last bucket include values <= field_max.
        bucket_entity = func.width_bucket(field_entity, field_min, field_max - bucket_width, num_buckets - 1)
        q = q.filter(field_entity <= field_max)
        bucket_label_entity = (
            cast(field_min + (bucket_entity - 1) * bucket_width, String)
            + " to "
            + cast(field_min + bucket_entity * bucket_width, String)
        )
        bucket_label_entity = bucket_label_entity.label(field["label"])

        q_entities.add(bucket_label_entity)
        return q, bucket_label_entity
开发者ID:adorsk-noaa,项目名称:SASI-Model.pre20120621,代码行数:33,代码来源:sa_dao.py


示例4: retrieve_simulation_latest_job

def retrieve_simulation_latest_job(uid, job_type=JOB_TYPE_COMPUTING):
    """Returns set of latest jobs for active simulations.

    :param str uid: Simulation UID.
    :param str job_type: Type of job.

    :returns: Job details.
    :rtype: list

    """
    j = types.Job
    s = types.Simulation
    qry = session.raw_query(
        s.id,                                           #0
        j.typeof,                                       #1
        j.execution_state,                              #2
        cast(j.is_compute_end, Integer),                #3
        cast(j.is_error, Integer),                      #4
        as_datetime_string(j.execution_start_date),     #5
        as_datetime_string(j.execution_end_date)        #6
        )

    qry = qry.join(j, s.uid == j.simulation_uid)
    qry = qry.order_by(j.execution_start_date.desc())

    qry = qry.filter(j.execution_start_date != None)
    qry = qry.filter(j.execution_state != None)
    qry = qry.filter(j.typeof == job_type)
    qry = qry.filter(s.uid == uid)

    return qry.first()
开发者ID:Prodiguer,项目名称:hermes-server,代码行数:31,代码来源:dao_monitoring_simulation.py


示例5: _repayments

    def _repayments(self):
        today_repayments = Plan.query.filter(
            cast(Plan.plan_time, Date) == date.today(),
            Plan.status == get_enum('PLAN_PENDING')
        ).order_by('plan_time desc').limit(10)

        today_repay_amount = db_session.query(
            func.sum(Plan.amount)
        ).filter(
            cast(Plan.plan_time, Date) == date.today(),
            Plan.status == get_enum('PLAN_PENDING')
        ).scalar()

        if not today_repay_amount:
            today_repay_amount = 0

        total_repay_amount = db_session.query(
            func.sum(Plan.amount)
        ).filter(Plan.status == get_enum('PLAN_PENDING')).scalar()

        if not total_repay_amount:
            total_repay_amount = 0

        app.jinja_env.globals['today_repay_amount'] = today_repay_amount
        app.jinja_env.globals['total_repay_amount'] = total_repay_amount
        app.jinja_env.globals['today_repayments'] = today_repayments
开发者ID:xxguo,项目名称:leopard,代码行数:26,代码来源:__init__.py


示例6: get_period_schedule

    def get_period_schedule(meetings, period, limit_min=None, limit_max=None):
        if period:
            if limit_max:
                meetings = meetings.filter(
                        cast(schedule.Agenda.endtime, Time) < limit_max)
            else:
                meetings = meetings.filter(
                        cast(schedule.Agenda.endtime, Time) < period.end)
            if limit_min:
                meetings = meetings.filter(
                        cast(schedule.Agenda.starttime, Time) > limit_min)
            else:
                meetings = meetings.filter(
                        cast(schedule.Agenda.starttime, Time) > period.begin)
        meetings = meetings.order_by(schedule.Agenda.starttime).all()

        try:
            first_patient_schedule_time = _get_first_meeting_time(meetings[0])
            last_patient_schedule_time = _get_last_meeting_time(meetings[-1])
        except IndexError:
            if not period:
                return ( (None, None), None, (None, None) )
            else:
                return ( (None, period.begin), None, (None, period.end) )
        if not period:  
            return ( (first_patient_schedule_time, None), meetings, 
                        (last_patient_schedule_time, None) 
                    )
        return ( _get_first_scheduled_time( period,
                                            first_patient_schedule_time),
                    meetings, 
                    _get_last_scheduled_time(period, 
                                            last_patient_schedule_time)
                )
开发者ID:fluxspir,项目名称:odontux,代码行数:34,代码来源:schedule.py


示例7: _get_event_with_enterqueue

def _get_event_with_enterqueue(session, start, end, match, event):
    start = start.strftime(_STR_TIME_FMT)
    end = end.strftime(_STR_TIME_FMT)

    enter_queues = (session
                    .query(QueueLog.callid,
                           cast(QueueLog.time, TIMESTAMP).label('time'))
                    .filter(and_(QueueLog.event == 'ENTERQUEUE',
                                 between(QueueLog.time, start, end))))

    enter_map = {}
    for enter_queue in enter_queues.all():
        enter_map[enter_queue.callid] = enter_queue.time

    if enter_map:
        res = (session
               .query(QueueLog.event,
                      QueueLog.queuename,
                      cast(QueueLog.time, TIMESTAMP).label('time'),
                      QueueLog.callid,
                      QueueLog.data3)
               .filter(and_(QueueLog.event == match,
                            QueueLog.callid.in_(enter_map))))

        for r in res.all():
            yield {
                'callid': r.callid,
                'queue_name': r.queuename,
                'time': enter_map[r.callid],
                'event': event,
                'talktime': 0,
                'waittime': int(r.data3) if r.data3 else 0
            }
开发者ID:jaunis,项目名称:xivo-dao,代码行数:33,代码来源:queue_log_dao.py


示例8: update_scores

    def update_scores(self, model, chunksize=10000):

        # update node scores
        waypoint_nodes = (
            self.session.query(
                Node,
                ST_X(cast(Node.loc, Geometry)),
                ST_Y(cast(Node.loc, Geometry)))
                .filter(Node.num_ways != 0)
                .order_by(func.random()))   # random order

        # process nodes in chunks for memory efficiency.
        # note: normalization of scores is done per chunk, which should be a
        # reasonable approximation to global normalization when the chunks are
        # large since the query specifies random ordering
        for chunk in _grouper(chunksize, waypoint_nodes):
            nodes, x, y = zip(*chunk)
            X = np.vstack((x, y)).T
            scores = model.score_samples(X)
            for node, score in zip(nodes, scores):
                node.score = score

        # update cumulative scores
        sq = (
            self.session.query(
                Waypoint.id.label('id'),
                func.sum(Node.score).over(
                    partition_by=Waypoint.way_id,
                    order_by=Waypoint.idx).label('cscore'))
                .join(Node)
                .subquery())

        (self.session.query(Waypoint)
             .filter(Waypoint.id == sq.c.id)
             .update({Waypoint.cscore: sq.c.cscore}))
开发者ID:mcwitt,项目名称:scenicstroll,代码行数:35,代码来源:route_db.py


示例9: core_individuals_stations

def core_individuals_stations(request):
    """ Get the stations of an identified individual. Parameter is : id (int)"""
    try:
        id = int(request.params["id"])
        # Query
        query = (
            select(
                [
                    cast(V_Individuals_LatLonDate.c.lat, Float),
                    cast(V_Individuals_LatLonDate.c.lon, Float),
                    V_Individuals_LatLonDate.c.date,
                ]
            )
            .where(V_Individuals_LatLonDate.c.ind_id == id)
            .order_by(desc(V_Individuals_LatLonDate.c.date))
        )
        # Create list of features from query result
        epoch = datetime.utcfromtimestamp(0)
        features = [
            {
                "type": "Feature",
                "properties": {"date": (date - epoch).total_seconds()},
                "geometry": {"type": "Point", "coordinates": [lon, lat]},
            }
            for lat, lon, date in reversed(DBSession.execute(query).fetchall())
        ]

        result = {"type": "FeatureCollection", "features": features}
        return result
    except:
        return []
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:31,代码来源:individuals.py


示例10: query_recursive_tree

def query_recursive_tree():
    structure_tree = (
        DBSession.query(
            Structure.id,
            Structure.name,
            Structure.parent_id,
            cast(1, Integer()).label('depth'),
            array([cast(Structure.name, Text)]).label('name_path'),
            array([Structure.id]).label('path'),
        )
        .filter(Structure.condition_root_level())
        .cte(name='structure_tree', recursive=True)
    )
    st = aliased(structure_tree, name='st')
    s = aliased(Structure, name='s')
    structure_tree = structure_tree.union_all(
        DBSession.query(
            s.id, s.name, s.parent_id,
            (st.c.depth + 1).label('depth'),
            func.array_append(
                st.c.name_path, cast(s.name, Text)
            ).label('name_path'),
            func.array_append(st.c.path, s.id).label('path'),
        )
        .filter(s.parent_id == st.c.id)
    )
    return DBSession.query(structure_tree)
开发者ID:alishir,项目名称:tcr,代码行数:27,代码来源:structures.py


示例11: getactivitystatistic

def getactivitystatistic():
	"""get activity statistic information"""
	try:
		token = request.json['token']
		activityid = request.json['activityid']
		activity = getactivitybyid(activityid)
		u = getuserinformation(token)
		if u != None and activity != None:
			state = 'successful'
			reason = ''

			registeredTotal = activity.users.count()
			registeredToday = activity.users.filter(cast(models.attentactivity.timestamp, Date) == date.today()).count()
			likedTotal = activity.likeusers.count()
			likedToday = activity.likeusers.filter(cast(models.likeactivity.timestamp, Date) == date.today()).count()
			
			result = {
					  'activity':activity.title,
					  'registeredTotal':registeredTotal,
					  'registeredToday':registeredToday,
					  'likedTotal':likedTotal, 
					  'likedToday':likedToday, 
					 }

		else:
			state = 'fail'
			reason = 'invalid access'
			result = ''

	except Exception,e:
		print e
		state = 'fail'
		reason = 'exception'
		result = ''
开发者ID:iThinkSeu,项目名称:LoveinSEU,代码行数:34,代码来源:activityroute.py


示例12: applySearchParam

    def applySearchParam(self, query, searchParam):
        if searchParam.CustomerId:
            query = query.filter(Order.CustomerId == searchParam.CustomerId)
        if searchParam.CustomerName:
            query = query.filter(CustomerContactDetails.FirstName.like("%%%s" % searchParam.CustomerName))
        if searchParam.IpAddress:
            query = query.filter(Order.IpAddress == searchParam.IpAddress)

        if searchParam.FromDate and not searchParam.ToDate:
            query = query.filter(cast(Order.OrderDate, Date) >= searchParam.FromDate)
        if not searchParam.FromDate and searchParam.ToDate:
            query = query.filter(cast(Order.OrderDate, Date) <= searchParam.ToDate)
        if searchParam.FromDate and searchParam.ToDate:
            query = query.filter(
                cast(Order.OrderDate, Date) >= searchParam.FromDate, cast(Order.OrderDate, Date) <= searchParam.ToDate
            )

        if searchParam.MinAmount and not searchParam.MaxAmount:
            query = query.filter(Order.OrderAmount >= searchParam.MinAmount)
        if not searchParam.MinAmount and searchParam.MaxAmount:
            query = query.filter(Order.OrderAmount <= searchParam.MaxAmount)
        if searchParam.MinAmount and searchParam.MaxAmount:
            query = query.filter(Order.OrderAmount >= searchParam.MinAmount, Order.OrderAmount <= searchParam.MaxAmount)

        if searchParam.InvoiceStatus == "opened":
            query = query.filter(or_((Order.OrderAmount - Order.PaidAmount) > 0.5, Order.OrderAmount == 0))
        elif searchParam.InvoiceStatus == "closed":
            query = query.filter(Order.OrderAmount != 0, (Order.PaidAmount - Order.OrderAmount) > 0.001)
        elif searchParam.InvoiceStatus == "overdue":
            query = query.filter((Order.OrderAmount - Order.PaidAmount) > 0.5, Order.DueDate < func.now())
        return query
开发者ID:cackharot,项目名称:viper-pos,代码行数:31,代码来源:ReportService.py


示例13: get_stats

    def get_stats(cls, date_from=None, date_to=None, limit=100):
        """
        Return overall stats/summary or stats for a given date range if given

        - date_to is inclusive
            - if just date_from is given, only records for that date are returned
        """

        query = db.query(cast(Visit.timestamp, Date),
                         func.count(Visit.id))

        if date_from:
            date_from, date_to = process_date_range(date_from, date_to)
            query = query.filter(Visit.timestamp >= date_from,
                                 Visit.timestamp < date_to)

        query = query.group_by(cast(Visit.timestamp, Date))
        query = query.order_by(cast(Visit.timestamp, Date))

        if limit:
            query = query.limit(limit)

        stats = query.all()

        return stats
开发者ID:kashifpk,项目名称:blogs_compulife,代码行数:25,代码来源:models.py


示例14: get_repeating

    def get_repeating(self, resource, except_applications, start_date, end_date, statuses):
        query = current_app.db_session.query(RepeatingSlot)
        objects = query.filter(Application.resource == resource,
                               RepeatingSlot.application_id == Application.id,
                               Application.status.in_(statuses))

        if except_applications:
            objects = objects.filter(
                ~RepeatingSlot.application_id.in_(except_applications)
            )

        objects = objects.filter(
            or_(
                tuple_(
                    RepeatingSlot.start_date, RepeatingSlot.end_date
                ).op('overlaps')(
                    tuple_(
                        cast(start_date, Date), cast(end_date, Date)
                    )
                ),
                or_(
                    # First range ends on the start date of the second
                    RepeatingSlot.end_date == cast(start_date, Date),
                    # Second range ends on the start date of the first
                    cast(end_date, Date) == RepeatingSlot.start_date
                )
            )
        )

        return objects.all()
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:30,代码来源:WeeklyRepeatingSlotsForResource.py


示例15: _add_ordering

def _add_ordering(sql_query, table, column_type, column_name, order):
    # Special case for this column, which sorts contigs correctly:
    if column_name == 'contig':
        get_contig_num = cast(
            text("SUBSTRING({} FROM '\d+')".format(table.c.contig)),
            type_=Integer)
        starts_with_chr = (text("SUBSTRING({} FROM '^chr(\d+)')"
                                .format(table.c.contig)) != literal(''))
        starts_with_number = (text("SUBSTRING({} FROM '^\d+')"
                                   .format(table.c.contig)) != literal(''))
        # 10000 used here to mean "should be at the end of all the numbers",
        # assuming we never hit a chromosome number >= 10000.
        contig_num_col = case(
            [(starts_with_chr, get_contig_num),
             (starts_with_number, get_contig_num)],
            else_=literal(10000)
        )
        contig_len_col = func.length(table.c.contig)
        contig_col = table.c.contig
        if order == 'desc':
            contig_len_col = desc(contig_len_col)
            contig_col = desc(contig_col)
        return sql_query.order_by(contig_num_col, contig_len_col, contig_col)
    sqla_type = vcf_type_to_sqla_type(column_type)
    column = cast(table.c[column_name], type_=sqla_type)
    column = {'asc': asc(column), 'desc': desc(column)}.get(order)
    return sql_query.order_by(column)
开发者ID:hammerlab,项目名称:cycledash,代码行数:27,代码来源:genotypes.py


示例16: get_requested_slots

    def get_requested_slots(self, resource, except_applications, start_date, end_date):
        query = current_app.db_session.query(RepeatingSlotRequest)

        objects = query.filter(
            RepeatingSlotRequest.application.has(resource_id=resource.id)
        )

        objects = objects.filter(
            RepeatingSlotRequest.application.has(status="Pending")
        )

        if except_applications:
            objects = objects.filter(
                ~RepeatingSlotRequest.application_id.in_(except_applications)
            )

        objects = objects.filter(
            or_(
                tuple_(
                    RepeatingSlotRequest.start_date, RepeatingSlotRequest.end_date
                ).op('overlaps')(
                    tuple_(
                        cast(start_date, Date), cast(end_date, Date)
                    )
                ),
                or_(
                    # First range ends on the start date of the second
                    RepeatingSlotRequest.end_date == cast(start_date, Date),
                    # Second range ends on the start date of the first
                    cast(end_date, Date) == RepeatingSlotRequest.start_date
                )
            )
        )

        return objects.all()
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:35,代码来源:WeeklyRepeatingSlotsForResource.py


示例17: incr

    def incr(self, category, ids, time=0, delta=1):
        self.delete_if_expired(category, ids)

        expiration = expiration_from_time(time)

        rp = self.table.update(sa.and_(self.table.c.category==category,
                                       self.table.c.ids==ids,
                                       self.table.c.kind=='num'),
                               values = {
                                         self.table.c.value:
                                         sa.cast(
                                                 sa.cast(self.table.c.value,
                                                          sa.Integer) + delta,
                                                 sa.String),
                                         self.table.c.expiration: expiration
                                         }
                               ).execute()
        if rp.rowcount == 1:
            return self.get(category, ids)
        elif rp.rowcount == 0:
            existing_value = self.get(category, ids)
            if existing_value is None:
                raise ValueError("[%s][%s] can't be incr()ed -- it's not set" %
                                 (category, ids))
            else:
                raise ValueError("[%s][%s] has non-integer value %r" %
                                 (category, ids, existing_value))
        else:
            raise ValueError("Somehow %d rows got updated" % rp.rowcount)
开发者ID:JediWatchman,项目名称:reddit,代码行数:29,代码来源:hardcachebackend.py


示例18: _log_deposit_action

 def _log_deposit_action(deposit, user, cashbox: str, final_amount: float, action: str):
     # Without these casts, the strings will end up as type 'unknown' in postgres, where the function lookup will fail due to incorrect type signature
     username = cast(user.username, String)
     action_string = cast(action, String)
     deposit_name = cast(deposit.name, String)
     proc = procs.exam_deposit_action(cash_box_ids[cashbox], action_string, final_amount, username, deposit_name)
     sqla.session.execute(proc)
开发者ID:Kha,项目名称:odie-server,代码行数:7,代码来源:accounting.py


示例19: search_route

def search_route():
    query = request.args.get('q')
    per_page = request.args.get('per_page', 10, type=int)
    page = request.args.get('page', 0, type=int)
    if query is None:
        return json.dumps({'results':[]})
    result = {}
    with session_scope() as session:
        results = session.query(
            City.name,
            City.country,
            func.ST_Y(cast(City.location, Geometry())),
            func.ST_X(cast(City.location, Geometry())),
            City.id
            ) \
            .filter(unaccent(City.name).ilike(unaccent('%' + query + '%'))) \
            .limit(per_page + 1) \
            .offset(page * per_page) \
            .all()

        more = len(results) == per_page + 1
        results = results[:per_page]
        result = json.dumps({
            'results':
            [{'id': c[4], 'text': '{}, {}'.format(c[0], c[1]), 'coords': (c[2], c[3])} for i,c in enumerate(results)],
            'more': more})
    return result
开发者ID:simlmx,项目名称:meteomap,代码行数:27,代码来源:site.py


示例20: filter_single_by_time

    def filter_single_by_time(cls, type, objects, year=None, week_number=None, day=None):

        assert (week_number and year) or day
        start_date, end_date = None, None

        if year and week_number:
            start_date, end_date = get_start_and_end_date_from_week_and_year(
                year,
                week_number
            )
        if day:
            start_date = day
            end_date = day

        objects = objects.filter(
            or_(
                tuple_(
                    cast(type.start_time, Date), cast(type.end_time, Date)
                ).op('overlaps')(
                    tuple_(
                        start_date, end_date
                    )
                ),
                or_(
                    # First range ends on the start date of the second
                    cast(type.end_time, Date) == start_date,
                    # Second range ends on the start date of the first
                    end_date == cast(type.start_time, Date)
                )
            )
        )

        return objects
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:33,代码来源:SlotsForWeekResource.py



注:本文中的sqlalchemy.cast函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.column函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.case函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap