• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.type_coerce函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.type_coerce函数的典型用法代码示例。如果您正苦于以下问题:Python type_coerce函数的具体用法?Python type_coerce怎么用?Python type_coerce使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了type_coerce函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: query

    def query(self, req):
        Family = sa.orm.aliased(Language, flat=True)
        Father = sa.orm.aliased(Language, flat=True)
        _Country = sa.orm.aliased(Country, name='_country')

        query = req.db.query(
                Languoid.id, Family.id.label('family_id'), Father.id.label('parent_id'),
                Languoid.name,
                Languoid.bookkeeping,
                sa.type_coerce(Languoid.level, sa.Text).label('level'),
                sa.type_coerce(Languoid.status, sa.Text).label('status'),
                Languoid.latitude, Languoid.longitude,
                sa.select([Identifier.name])
                    .where(LanguageIdentifier.identifier_pk == Identifier.pk)
                    .where(LanguageIdentifier.language_pk == Language.pk)
                    .where(Identifier.type == 'iso639-3')
                    .label('iso639P3code'),
                Languoid.description, Languoid.markup_description,
                Languoid.child_family_count, Languoid.child_language_count, Languoid.child_dialect_count,
                sa.select([sa.literal_column("string_agg(_country.id, ' ' ORDER BY _country.id)")])
                    .where(Languoidcountry.country_pk == _Country.pk)
                    .where(Languoidcountry.languoid_pk == Languoid.pk)
                    .label('country_ids'),
            ).select_from(Languoid).filter(Languoid.active)\
            .outerjoin(Family, Family.pk == Languoid.family_pk)\
            .outerjoin(Father, Father.pk == Languoid.father_pk)\
            .order_by(Languoid.id)
        return query
开发者ID:clld,项目名称:glottolog3,代码行数:28,代码来源:adapters.py


示例2: _query_citing_records

 def _query_citing_records(self, show_duplicates=False):
     """Returns records which cites this one."""
     index_ref = self._get_index_ref()
     if not index_ref:
         raise Exception("There is no index_ref for this object")
     citation_query = RecordMetadata.query.with_entities(RecordMetadata.id,
                                                         RecordMetadata.json['control_number'])
     citation_filter = referenced_records(RecordMetadata.json).contains([index_ref])
     filter_deleted_records = or_(not_(type_coerce(RecordMetadata.json, JSONB).has_key('deleted')),  # noqa: W601
                                  not_(RecordMetadata.json['deleted'] == cast(True, JSONB)))
     only_literature_collection = type_coerce(RecordMetadata.json, JSONB)['_collections'].contains(['Literature'])
     filter_superseded_records = or_(
         not_(type_coerce(RecordMetadata.json, JSONB).has_key('related_records')),  # noqa: W601
         not_(type_coerce(RecordMetadata.json, JSONB)['related_records'].contains([{'relation': 'successor'}]))
     )
     citations = citation_query.filter(citation_filter,
                                       filter_deleted_records,
                                       filter_superseded_records,
                                       only_literature_collection)
     if not show_duplicates:
         # It just hides duplicates, and still can show citations
         # which do not have proper PID in PID store
         # Duplicated data should be removed with the CLI command
         citations = citations.distinct(RecordMetadata.json['control_number'])
     return citations
开发者ID:inspirehep,项目名称:inspire-next,代码行数:25,代码来源:api.py


示例3: get_query_records_to_index

def get_query_records_to_index(pid_types):
    """
    Return a query for retrieving all non deleted records by pid_type

    Args:
        pid_types(List[str]): a list of pid types

    Return:
        SQLAlchemy query for non deleted record with pid type in `pid_types`
    """
    query = (
        db.session.query(PersistentIdentifier.object_uuid).join(RecordMetadata, type_coerce(PersistentIdentifier.object_uuid, String) == type_coerce(RecordMetadata.id, String))
        .filter(
            PersistentIdentifier.pid_type.in_(pid_types),
            PersistentIdentifier.object_type == 'rec',
            PersistentIdentifier.status == PIDStatus.REGISTERED,
            or_(
                not_(
                    type_coerce(RecordMetadata.json, JSONB).has_key('deleted')
                ),
                RecordMetadata.json["deleted"] == cast(False, JSONB)
            )
            # noqa: F401
        )
    )
    return query
开发者ID:harunurhan,项目名称:inspire-next,代码行数:26,代码来源:cli.py


示例4: find_by_holding

    def find_by_holding(cls, **kwargs):
        """Find item versions based on their holdings information.

        Every given kwarg will be queried as a key-value pair in the items
        holding.

        :returns: List[(UUID, version_id)] with `version_id` as used by
                  `RecordMetadata.version_id`.
        """
        def _get_filter_clause(obj, key, value):
            val = obj[key].astext
            CASTS = {
                bool: lambda x: cast(x, BOOLEAN),
                int: lambda x: cast(x, INTEGER),
                datetime.date: lambda x: cast(x, DATE),
            }
            if (not isinstance(value, six.string_types) and
                    isinstance(value, collections.Sequence)):
                if len(value) == 2:
                    return CASTS[type(value[0])](val).between(*value)
                raise ValueError('Too few/many values for a range query. '
                                 'Range query requires two values.')
            return CASTS.get(type(value), lambda x: x)(val) == value

        RecordMetadataVersion = version_class(RecordMetadata)

        data = type_coerce(RecordMetadataVersion.json, JSONB)
        path = ('_circulation', 'holdings')

        subquery = db.session.query(
            RecordMetadataVersion.id.label('id'),
            RecordMetadataVersion.version_id.label('version_id'),
            func.json_array_elements(data[path]).label('obj')
        ).subquery()

        obj = type_coerce(subquery.c.obj, JSONB)

        query = db.session.query(
            RecordMetadataVersion.id,
            RecordMetadataVersion.version_id
        ).filter(
            RecordMetadataVersion.id == subquery.c.id,
            RecordMetadataVersion.version_id == subquery.c.version_id,
            *(_get_filter_clause(obj, k, v) for k, v in kwargs.items())
        )

        for result in query:
            yield result
开发者ID:tiborsimko,项目名称:invenio-circulation,代码行数:48,代码来源:api.py


示例5: test_nested_type_trans

    def test_nested_type_trans(self):
        customer = self.tables.customer
        order = self.tables.order
        item = self.tables.item

        class SpecialType(TypeDecorator):
            impl = Integer

            def process_result_value(self, value, dialect):
                return str(value) + "_processed"

        sub_sub_stmt = nested(select([type_coerce(item.c.price, SpecialType)]).\
                                    where(item.c.order_id ==
                                            order.c.id)).label('i')
        sub_stmt = nested(select([sub_sub_stmt]).where(order.c.customer_id ==
                                            customer.c.id)).label('o')
        stmt = select([sub_stmt]).where(customer.c.id == 1)
        r = config.db.execute(stmt)
        row = r.fetchone()
        sub_result = row['o']
        sub_sub_result = sub_result.fetchone()['i']
        eq_(
            list(sub_sub_result),
            [('9.99_processed',), ('19.99_processed',)]
        )
开发者ID:computerstaat,项目名称:sql-layer-adapter-sqlalchemy,代码行数:25,代码来源:test_nested_cursor.py


示例6: get_master_calibration_image

def get_master_calibration_image(image, calibration_type, master_selection_criteria,
                                 use_only_older_calibrations=False, db_address=_DEFAULT_DB):
    calibration_criteria = CalibrationImage.type == calibration_type.upper()
    calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id
    calibration_criteria &= CalibrationImage.is_master.is_(True)

    for criterion in master_selection_criteria:
        # We have to cast to strings according to the sqlalchemy docs for version 1.3:
        # https://docs.sqlalchemy.org/en/latest/core/type_basics.html?highlight=json#sqlalchemy.types.JSON
        calibration_criteria &= cast(CalibrationImage.attributes[criterion], String) ==\
                                type_coerce(getattr(image, criterion), JSON)

    # During real-time reduction, we want to avoid using different master calibrations for the same block,
    # therefore we make sure the the calibration frame used was created before the block start time
    if use_only_older_calibrations and image.block_start is not None:
        calibration_criteria &= CalibrationImage.datecreated < image.block_start

    with get_session(db_address=db_address) as db_session:
        calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all()

    # Exit if no calibration file found
    if len(calibration_images) == 0:
        return None

    # Find the closest date
    date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images]))
    closest_calibration_image = calibration_images[np.argmin(date_deltas)]
    calibration_file = os.path.join(closest_calibration_image.filepath, closest_calibration_image.filename)

    if abs(min(date_deltas)) > datetime.timedelta(days=30):
        msg = "The closest calibration file in the database was created more than 30 days before or after " \
              "the image being reduced."
        logger.warning(msg, image=image, extra_tags={'master_calibration': os.path.basename(calibration_file)})

    return calibration_file
开发者ID:LCOGT,项目名称:banzai,代码行数:35,代码来源:dbs.py


示例7: get_literature_recids_for_orcid

def get_literature_recids_for_orcid(orcid):
    """Return the Literature recids that were claimed by an ORCiD.

    We record the fact that the Author record X has claimed the Literature
    record Y by storing in Y an author object with a ``$ref`` pointing to X
    and the key ``curated_relation`` set to ``True``. Therefore this method
    first searches the DB for the Author records for the one containing the
    given ORCiD, and then uses its recid to search in ES for the Literature
    records that satisfy the above property.

    Args:
        orcid (str): the ORCiD.

    Return:
        list(int): the recids of the Literature records that were claimed
        by that ORCiD.

    """
    orcid_object = '[{"schema": "ORCID", "value": "%s"}]' % orcid
    # this first query is written in a way that can use the index on (json -> ids)
    author_rec_uuid = db.session.query(RecordMetadata.id)\
        .filter(type_coerce(RecordMetadata.json, JSONB)['ids'].contains(orcid_object)).one().id
    author_recid = db.session.query(PersistentIdentifier.pid_value).filter(
        PersistentIdentifier.object_type == 'rec',
        PersistentIdentifier.object_uuid == author_rec_uuid,
        PersistentIdentifier.pid_type == 'aut',
    ).one().pid_value

    query = Q('match', authors__curated_relation=True) & Q('match', authors__recid=author_recid)
    search_by_curated_author = LiteratureSearch().query('nested', path='authors', query=query)\
                                                 .params(_source=['control_number'], size=9999)

    return [el['control_number'] for el in search_by_curated_author]
开发者ID:harunurhan,项目名称:inspire-next,代码行数:33,代码来源:utils.py


示例8: bind_expression

 def bind_expression(self, bindvalue):
     # convert the bind's type from PGPString to
     # String, so that it's passed to psycopg2 as is without
     # a dbapi.Binary wrapper
     # raise Exception("asdf")
     bindvalue = type_coerce(bindvalue, String)
     return func.tsrange(bindvalue)
开发者ID:aurynn,项目名称:openstack-artifice,代码行数:7,代码来源:usage.py


示例9: distance_to

 def distance_to(cls, other_catchment):
     return type_coerce(
         1e-6 * ((Descriptors.centroid_ngr_x - other_catchment.descriptors.centroid_ngr_x) *
                 (Descriptors.centroid_ngr_x - other_catchment.descriptors.centroid_ngr_x) +
                 (Descriptors.centroid_ngr_y - other_catchment.descriptors.centroid_ngr_y) *
                 (Descriptors.centroid_ngr_y - other_catchment.descriptors.centroid_ngr_y)),
         Float())
开发者ID:xuexianwu,项目名称:floodestimation,代码行数:7,代码来源:entities.py


示例10: test_limit_preserves_typing_information

    def test_limit_preserves_typing_information(self):
        class MyType(TypeDecorator):
            impl = Integer

        stmt = select([type_coerce(column("x"), MyType).label("foo")]).limit(1)
        dialect = oracle.dialect()
        compiled = stmt.compile(dialect=dialect)
        assert isinstance(compiled._create_result_map()["foo"][-1], MyType)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:8,代码来源:test_compiler.py


示例11: get_courses

def get_courses(order_by='course_rating', order_direction=DESCENDING, limit=100, offset=0, **kwargs):
    numeric_columns = { 'course_rating', 'instructor_rating', 'workload' }
    all_columns = keys | numeric_columns | { 'grade' }

    exact_keys = { key for key in keys if key in kwargs and kwargs[key] != AVERAGE }
    group_keys = keys - kwargs.keys()

    order_by_name = order_by if (order_by in all_columns and kwargs.get(order_by, '') != AVERAGE) else 'course_rating'

    query = select(
            [courses.columns[key] for key in group_keys] +
            [type_coerce(func.avg(courses.columns[key]), Float).label(key) for key in numeric_columns] +
            [type_coerce(func.avg(case(
                { 'A': 4.0, 'B': 3.0, 'C': 2.0, 'NR': 0.0 },
                value=courses.columns.grade,
                else_=0.0,
            )), Float).label('grade')]
        ).where(
            and_(*[courses.columns[key] == kwargs[key] for key in exact_keys])
        ).group_by(
            *[courses.columns[key] for key in group_keys]
        ).order_by(
            desc(order_by_name) if order_direction == DESCENDING else asc(order_by_name)
        ).limit(min(100, max(1, limit))).offset(max(0, offset))

    results = query.execute().fetchall()

    dict_result = []
    for result in results:
        item = dict(result.items())
        for key in exact_keys:
            item[key] = kwargs[key]
        grade = item['grade']
        if grade >= 3.5:
            grade = 'A'
        elif grade >= 2.5:
            grade = 'B'
        elif grade >= 1.5:
            grade = 'C'
        else:
            grade = 'NR'
        item['grade'] = grade
        dict_result.append(item)

    return dict_result
    return query
开发者ID:benmcmorran,项目名称:coursery,代码行数:46,代码来源:database.py


示例12: select_list

def select_list(userid, form):
    # Find the unique violation types and the number of reporters. This will be
    # joined against the Report model to get the violations/reporters for each
    # selected report.
    subq = (
        ReportComment.dbsession.query(
            ReportComment.reportid,
            sa.func.count(),
            sa.type_coerce(
                sa.func.array_agg(ReportComment.violation.distinct()),
                ARRAY(sa.Integer, as_tuple=True)).label('violations'))
        .filter(ReportComment.violation != 0)
        .group_by(ReportComment.reportid)
        .subquery())

    # Find reports, joining against the aforementioned subquery, and eager-load
    # the reports' owners.
    q = (
        Report.dbsession.query(Report, subq)
        .options(joinedload(Report.owner))
        .join(subq, Report.reportid == subq.c.reportid)
        .reset_joinpoint())

    # For each type of report, eagerly load the content reported and the
    # content's owner. Also, keep track of the Login model aliases used for each
    # report type so they can be filtered against later.
    login_aliases = []
    for column_name in _report_types:
        login_alias = aliased(Login)
        login_aliases.append(login_alias)
        q = (
            q
            .outerjoin(getattr(Report, column_name))
            .outerjoin(login_alias)
            .options(contains_eager(column_name + '.owner', alias=login_alias))
            .reset_joinpoint())

    # Filter by report status. form.status can also be 'all', in which case no
    # filter is applied.
    if form.status == 'closed':
        q = q.filter_by(is_closed=True)
    elif form.status == 'open':
        q = q.filter_by(is_closed=False)

    # If filtering by the report's content's owner, iterate over the previously
    # collected Login model aliases to compare against Login.login_name.
    if form.submitter:
        submitter = legacy.login_name(form.submitter)
        q = q.filter(sa.or_(l.login_name == submitter for l in login_aliases))

    # If filtering by violation type, see if the violation is in the array
    # aggregate of unique violations for this report.
    if form.violation and form.violation != '-1':
        q = q.filter(sa.literal(int(form.violation)) == sa.func.any(subq.c.violations))

    q = q.order_by(Report.opened_at.desc())
    return [(report, report_count, map(_convert_violation, violations))
            for report, _, report_count, violations in q.all()]
开发者ID:Syfaro,项目名称:weasyl,代码行数:58,代码来源:report.py


示例13: test_crit_against_int_coerce_type

    def test_crit_against_int_coerce_type(self):
        name = self.tables.data_table.c.name
        col = self.tables.data_table.c['data']

        self._test_index_criteria(
            and_(name == 'r6', cast(col["a"], String) == type_coerce(5, JSON)),
            "r6",
            test_literal=False
        )
开发者ID:eoghanmurray,项目名称:sqlalchemy,代码行数:9,代码来源:test_types.py


示例14: in_

 def in_(self, other):
     if isinstance(other, collections.Iterable):
         ret = []
         for elem in other:
             if isinstance(elem, EnumSymbol):
                 elem = type_coerce(elem, DeclEnum)
             ret.append(elem)
         other = ret
     return types.Enum.Comparator.in_(self, other)
开发者ID:unikmhz,项目名称:npui,代码行数:9,代码来源:fields.py


示例15: apply

 def apply(self, q, cuts):
     """ Apply a set of filters, which can be given as a set of tuples in
     the form (ref, operator, value), or as a string in query form. If it
     is ``None``, no filter will be applied. """
     info = []
     for (ref, operator, value) in self.parse(cuts):
         info.append({'ref': ref, 'operator': operator, 'value': value})
         table, column = self.cube.model[ref].bind(self.cube)
         q = self.ensure_table(q, table)
         q = q.where(column == type_coerce(value, column.type))
     return info, q
开发者ID:COLABORATI,项目名称:babbage,代码行数:11,代码来源:cuts.py


示例16: delete_records_without_control_number

def delete_records_without_control_number():
    """
    Find all record without a control number and delete them.
    """
    # Find all records without control_number.
    records = RecordMetadata.query.filter(not_(
        type_coerce(RecordMetadata.json, JSONB).has_key(
            'control_number'))).all()

    for record in records:
        _delete_record(record)
开发者ID:harunurhan,项目名称:inspire-next,代码行数:11,代码来源:delete_records.py


示例17: get_bpm_filename

def get_bpm_filename(instrument_id, ccdsum, db_address=_DEFAULT_DB):
    with get_session(db_address=db_address) as db_session:
        criteria = (CalibrationImage.type == 'BPM', CalibrationImage.instrument_id == instrument_id,
                    cast(CalibrationImage.attributes['ccdsum'], String) == type_coerce(ccdsum, JSON))
        bpm_query = db_session.query(CalibrationImage).filter(*criteria)
        bpm = bpm_query.order_by(desc(CalibrationImage.dateobs)).first()

        if bpm is not None:
            bpm_path = os.path.join(bpm.filepath, bpm.filename)
        else:
            bpm_path = None
    return bpm_path
开发者ID:LCOGT,项目名称:banzai,代码行数:12,代码来源:dbs.py


示例18: test_ambiguous_column_by_col_plus_label

    def test_ambiguous_column_by_col_plus_label(self):
        users = self.tables.users

        users.insert().execute(user_id=1, user_name='john')
        result = select(
            [users.c.user_id,
                type_coerce(users.c.user_id, Integer).label('foo')]).execute()
        row = result.first()
        eq_(
            row[users.c.user_id], 1
        )
        eq_(
            row[1], 1
        )
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:14,代码来源:test_resultset.py


示例19: list_codes

def list_codes():
    if request_wants_json():
        with transaction():
            result = db.session.query(
                Code.id, Code.value,
                type_coerce(Code.user_id, db.Boolean)
            ).select_from(Code).outerjoin(User)
            return jsonify(rows=[{
                'id': code_id,
                'code': value,
                'requested': requested,
            } for code_id, value, requested in result])
    else:
        return render_template('list_codes.html')
开发者ID:sebschrader,项目名称:bluemix-promocodes,代码行数:14,代码来源:__init__.py


示例20: setup_mappers

    def setup_mappers(cls):
        mapper(cls.classes.Person, cls.tables.person, properties=dict(
            pets=relationship(
                cls.classes.Pet, primaryjoin=(
                    orm.foreign(cls.tables.pets.c.person_id) ==
                    sa.cast(
                        sa.type_coerce(cls.tables.person.c.id, Integer),
                        Integer
                    )
                )
            )
        ))

        mapper(cls.classes.Pet, cls.tables.pets)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:14,代码来源:test_lazy_relations.py



注:本文中的sqlalchemy.type_coerce函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.union函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.tuple_函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap