本文整理汇总了Python中sqlalchemy.type_coerce函数的典型用法代码示例。如果您正苦于以下问题:Python type_coerce函数的具体用法?Python type_coerce怎么用?Python type_coerce使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了type_coerce函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: query
def query(self, req):
Family = sa.orm.aliased(Language, flat=True)
Father = sa.orm.aliased(Language, flat=True)
_Country = sa.orm.aliased(Country, name='_country')
query = req.db.query(
Languoid.id, Family.id.label('family_id'), Father.id.label('parent_id'),
Languoid.name,
Languoid.bookkeeping,
sa.type_coerce(Languoid.level, sa.Text).label('level'),
sa.type_coerce(Languoid.status, sa.Text).label('status'),
Languoid.latitude, Languoid.longitude,
sa.select([Identifier.name])
.where(LanguageIdentifier.identifier_pk == Identifier.pk)
.where(LanguageIdentifier.language_pk == Language.pk)
.where(Identifier.type == 'iso639-3')
.label('iso639P3code'),
Languoid.description, Languoid.markup_description,
Languoid.child_family_count, Languoid.child_language_count, Languoid.child_dialect_count,
sa.select([sa.literal_column("string_agg(_country.id, ' ' ORDER BY _country.id)")])
.where(Languoidcountry.country_pk == _Country.pk)
.where(Languoidcountry.languoid_pk == Languoid.pk)
.label('country_ids'),
).select_from(Languoid).filter(Languoid.active)\
.outerjoin(Family, Family.pk == Languoid.family_pk)\
.outerjoin(Father, Father.pk == Languoid.father_pk)\
.order_by(Languoid.id)
return query
开发者ID:clld,项目名称:glottolog3,代码行数:28,代码来源:adapters.py
示例2: _query_citing_records
def _query_citing_records(self, show_duplicates=False):
"""Returns records which cites this one."""
index_ref = self._get_index_ref()
if not index_ref:
raise Exception("There is no index_ref for this object")
citation_query = RecordMetadata.query.with_entities(RecordMetadata.id,
RecordMetadata.json['control_number'])
citation_filter = referenced_records(RecordMetadata.json).contains([index_ref])
filter_deleted_records = or_(not_(type_coerce(RecordMetadata.json, JSONB).has_key('deleted')), # noqa: W601
not_(RecordMetadata.json['deleted'] == cast(True, JSONB)))
only_literature_collection = type_coerce(RecordMetadata.json, JSONB)['_collections'].contains(['Literature'])
filter_superseded_records = or_(
not_(type_coerce(RecordMetadata.json, JSONB).has_key('related_records')), # noqa: W601
not_(type_coerce(RecordMetadata.json, JSONB)['related_records'].contains([{'relation': 'successor'}]))
)
citations = citation_query.filter(citation_filter,
filter_deleted_records,
filter_superseded_records,
only_literature_collection)
if not show_duplicates:
# It just hides duplicates, and still can show citations
# which do not have proper PID in PID store
# Duplicated data should be removed with the CLI command
citations = citations.distinct(RecordMetadata.json['control_number'])
return citations
开发者ID:inspirehep,项目名称:inspire-next,代码行数:25,代码来源:api.py
示例3: get_query_records_to_index
def get_query_records_to_index(pid_types):
"""
Return a query for retrieving all non deleted records by pid_type
Args:
pid_types(List[str]): a list of pid types
Return:
SQLAlchemy query for non deleted record with pid type in `pid_types`
"""
query = (
db.session.query(PersistentIdentifier.object_uuid).join(RecordMetadata, type_coerce(PersistentIdentifier.object_uuid, String) == type_coerce(RecordMetadata.id, String))
.filter(
PersistentIdentifier.pid_type.in_(pid_types),
PersistentIdentifier.object_type == 'rec',
PersistentIdentifier.status == PIDStatus.REGISTERED,
or_(
not_(
type_coerce(RecordMetadata.json, JSONB).has_key('deleted')
),
RecordMetadata.json["deleted"] == cast(False, JSONB)
)
# noqa: F401
)
)
return query
开发者ID:harunurhan,项目名称:inspire-next,代码行数:26,代码来源:cli.py
示例4: find_by_holding
def find_by_holding(cls, **kwargs):
"""Find item versions based on their holdings information.
Every given kwarg will be queried as a key-value pair in the items
holding.
:returns: List[(UUID, version_id)] with `version_id` as used by
`RecordMetadata.version_id`.
"""
def _get_filter_clause(obj, key, value):
val = obj[key].astext
CASTS = {
bool: lambda x: cast(x, BOOLEAN),
int: lambda x: cast(x, INTEGER),
datetime.date: lambda x: cast(x, DATE),
}
if (not isinstance(value, six.string_types) and
isinstance(value, collections.Sequence)):
if len(value) == 2:
return CASTS[type(value[0])](val).between(*value)
raise ValueError('Too few/many values for a range query. '
'Range query requires two values.')
return CASTS.get(type(value), lambda x: x)(val) == value
RecordMetadataVersion = version_class(RecordMetadata)
data = type_coerce(RecordMetadataVersion.json, JSONB)
path = ('_circulation', 'holdings')
subquery = db.session.query(
RecordMetadataVersion.id.label('id'),
RecordMetadataVersion.version_id.label('version_id'),
func.json_array_elements(data[path]).label('obj')
).subquery()
obj = type_coerce(subquery.c.obj, JSONB)
query = db.session.query(
RecordMetadataVersion.id,
RecordMetadataVersion.version_id
).filter(
RecordMetadataVersion.id == subquery.c.id,
RecordMetadataVersion.version_id == subquery.c.version_id,
*(_get_filter_clause(obj, k, v) for k, v in kwargs.items())
)
for result in query:
yield result
开发者ID:tiborsimko,项目名称:invenio-circulation,代码行数:48,代码来源:api.py
示例5: test_nested_type_trans
def test_nested_type_trans(self):
customer = self.tables.customer
order = self.tables.order
item = self.tables.item
class SpecialType(TypeDecorator):
impl = Integer
def process_result_value(self, value, dialect):
return str(value) + "_processed"
sub_sub_stmt = nested(select([type_coerce(item.c.price, SpecialType)]).\
where(item.c.order_id ==
order.c.id)).label('i')
sub_stmt = nested(select([sub_sub_stmt]).where(order.c.customer_id ==
customer.c.id)).label('o')
stmt = select([sub_stmt]).where(customer.c.id == 1)
r = config.db.execute(stmt)
row = r.fetchone()
sub_result = row['o']
sub_sub_result = sub_result.fetchone()['i']
eq_(
list(sub_sub_result),
[('9.99_processed',), ('19.99_processed',)]
)
开发者ID:computerstaat,项目名称:sql-layer-adapter-sqlalchemy,代码行数:25,代码来源:test_nested_cursor.py
示例6: get_master_calibration_image
def get_master_calibration_image(image, calibration_type, master_selection_criteria,
use_only_older_calibrations=False, db_address=_DEFAULT_DB):
calibration_criteria = CalibrationImage.type == calibration_type.upper()
calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id
calibration_criteria &= CalibrationImage.is_master.is_(True)
for criterion in master_selection_criteria:
# We have to cast to strings according to the sqlalchemy docs for version 1.3:
# https://docs.sqlalchemy.org/en/latest/core/type_basics.html?highlight=json#sqlalchemy.types.JSON
calibration_criteria &= cast(CalibrationImage.attributes[criterion], String) ==\
type_coerce(getattr(image, criterion), JSON)
# During real-time reduction, we want to avoid using different master calibrations for the same block,
# therefore we make sure the the calibration frame used was created before the block start time
if use_only_older_calibrations and image.block_start is not None:
calibration_criteria &= CalibrationImage.datecreated < image.block_start
with get_session(db_address=db_address) as db_session:
calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all()
# Exit if no calibration file found
if len(calibration_images) == 0:
return None
# Find the closest date
date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images]))
closest_calibration_image = calibration_images[np.argmin(date_deltas)]
calibration_file = os.path.join(closest_calibration_image.filepath, closest_calibration_image.filename)
if abs(min(date_deltas)) > datetime.timedelta(days=30):
msg = "The closest calibration file in the database was created more than 30 days before or after " \
"the image being reduced."
logger.warning(msg, image=image, extra_tags={'master_calibration': os.path.basename(calibration_file)})
return calibration_file
开发者ID:LCOGT,项目名称:banzai,代码行数:35,代码来源:dbs.py
示例7: get_literature_recids_for_orcid
def get_literature_recids_for_orcid(orcid):
"""Return the Literature recids that were claimed by an ORCiD.
We record the fact that the Author record X has claimed the Literature
record Y by storing in Y an author object with a ``$ref`` pointing to X
and the key ``curated_relation`` set to ``True``. Therefore this method
first searches the DB for the Author records for the one containing the
given ORCiD, and then uses its recid to search in ES for the Literature
records that satisfy the above property.
Args:
orcid (str): the ORCiD.
Return:
list(int): the recids of the Literature records that were claimed
by that ORCiD.
"""
orcid_object = '[{"schema": "ORCID", "value": "%s"}]' % orcid
# this first query is written in a way that can use the index on (json -> ids)
author_rec_uuid = db.session.query(RecordMetadata.id)\
.filter(type_coerce(RecordMetadata.json, JSONB)['ids'].contains(orcid_object)).one().id
author_recid = db.session.query(PersistentIdentifier.pid_value).filter(
PersistentIdentifier.object_type == 'rec',
PersistentIdentifier.object_uuid == author_rec_uuid,
PersistentIdentifier.pid_type == 'aut',
).one().pid_value
query = Q('match', authors__curated_relation=True) & Q('match', authors__recid=author_recid)
search_by_curated_author = LiteratureSearch().query('nested', path='authors', query=query)\
.params(_source=['control_number'], size=9999)
return [el['control_number'] for el in search_by_curated_author]
开发者ID:harunurhan,项目名称:inspire-next,代码行数:33,代码来源:utils.py
示例8: bind_expression
def bind_expression(self, bindvalue):
# convert the bind's type from PGPString to
# String, so that it's passed to psycopg2 as is without
# a dbapi.Binary wrapper
# raise Exception("asdf")
bindvalue = type_coerce(bindvalue, String)
return func.tsrange(bindvalue)
开发者ID:aurynn,项目名称:openstack-artifice,代码行数:7,代码来源:usage.py
示例9: distance_to
def distance_to(cls, other_catchment):
return type_coerce(
1e-6 * ((Descriptors.centroid_ngr_x - other_catchment.descriptors.centroid_ngr_x) *
(Descriptors.centroid_ngr_x - other_catchment.descriptors.centroid_ngr_x) +
(Descriptors.centroid_ngr_y - other_catchment.descriptors.centroid_ngr_y) *
(Descriptors.centroid_ngr_y - other_catchment.descriptors.centroid_ngr_y)),
Float())
开发者ID:xuexianwu,项目名称:floodestimation,代码行数:7,代码来源:entities.py
示例10: test_limit_preserves_typing_information
def test_limit_preserves_typing_information(self):
class MyType(TypeDecorator):
impl = Integer
stmt = select([type_coerce(column("x"), MyType).label("foo")]).limit(1)
dialect = oracle.dialect()
compiled = stmt.compile(dialect=dialect)
assert isinstance(compiled._create_result_map()["foo"][-1], MyType)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:8,代码来源:test_compiler.py
示例11: get_courses
def get_courses(order_by='course_rating', order_direction=DESCENDING, limit=100, offset=0, **kwargs):
numeric_columns = { 'course_rating', 'instructor_rating', 'workload' }
all_columns = keys | numeric_columns | { 'grade' }
exact_keys = { key for key in keys if key in kwargs and kwargs[key] != AVERAGE }
group_keys = keys - kwargs.keys()
order_by_name = order_by if (order_by in all_columns and kwargs.get(order_by, '') != AVERAGE) else 'course_rating'
query = select(
[courses.columns[key] for key in group_keys] +
[type_coerce(func.avg(courses.columns[key]), Float).label(key) for key in numeric_columns] +
[type_coerce(func.avg(case(
{ 'A': 4.0, 'B': 3.0, 'C': 2.0, 'NR': 0.0 },
value=courses.columns.grade,
else_=0.0,
)), Float).label('grade')]
).where(
and_(*[courses.columns[key] == kwargs[key] for key in exact_keys])
).group_by(
*[courses.columns[key] for key in group_keys]
).order_by(
desc(order_by_name) if order_direction == DESCENDING else asc(order_by_name)
).limit(min(100, max(1, limit))).offset(max(0, offset))
results = query.execute().fetchall()
dict_result = []
for result in results:
item = dict(result.items())
for key in exact_keys:
item[key] = kwargs[key]
grade = item['grade']
if grade >= 3.5:
grade = 'A'
elif grade >= 2.5:
grade = 'B'
elif grade >= 1.5:
grade = 'C'
else:
grade = 'NR'
item['grade'] = grade
dict_result.append(item)
return dict_result
return query
开发者ID:benmcmorran,项目名称:coursery,代码行数:46,代码来源:database.py
示例12: select_list
def select_list(userid, form):
# Find the unique violation types and the number of reporters. This will be
# joined against the Report model to get the violations/reporters for each
# selected report.
subq = (
ReportComment.dbsession.query(
ReportComment.reportid,
sa.func.count(),
sa.type_coerce(
sa.func.array_agg(ReportComment.violation.distinct()),
ARRAY(sa.Integer, as_tuple=True)).label('violations'))
.filter(ReportComment.violation != 0)
.group_by(ReportComment.reportid)
.subquery())
# Find reports, joining against the aforementioned subquery, and eager-load
# the reports' owners.
q = (
Report.dbsession.query(Report, subq)
.options(joinedload(Report.owner))
.join(subq, Report.reportid == subq.c.reportid)
.reset_joinpoint())
# For each type of report, eagerly load the content reported and the
# content's owner. Also, keep track of the Login model aliases used for each
# report type so they can be filtered against later.
login_aliases = []
for column_name in _report_types:
login_alias = aliased(Login)
login_aliases.append(login_alias)
q = (
q
.outerjoin(getattr(Report, column_name))
.outerjoin(login_alias)
.options(contains_eager(column_name + '.owner', alias=login_alias))
.reset_joinpoint())
# Filter by report status. form.status can also be 'all', in which case no
# filter is applied.
if form.status == 'closed':
q = q.filter_by(is_closed=True)
elif form.status == 'open':
q = q.filter_by(is_closed=False)
# If filtering by the report's content's owner, iterate over the previously
# collected Login model aliases to compare against Login.login_name.
if form.submitter:
submitter = legacy.login_name(form.submitter)
q = q.filter(sa.or_(l.login_name == submitter for l in login_aliases))
# If filtering by violation type, see if the violation is in the array
# aggregate of unique violations for this report.
if form.violation and form.violation != '-1':
q = q.filter(sa.literal(int(form.violation)) == sa.func.any(subq.c.violations))
q = q.order_by(Report.opened_at.desc())
return [(report, report_count, map(_convert_violation, violations))
for report, _, report_count, violations in q.all()]
开发者ID:Syfaro,项目名称:weasyl,代码行数:58,代码来源:report.py
示例13: test_crit_against_int_coerce_type
def test_crit_against_int_coerce_type(self):
name = self.tables.data_table.c.name
col = self.tables.data_table.c['data']
self._test_index_criteria(
and_(name == 'r6', cast(col["a"], String) == type_coerce(5, JSON)),
"r6",
test_literal=False
)
开发者ID:eoghanmurray,项目名称:sqlalchemy,代码行数:9,代码来源:test_types.py
示例14: in_
def in_(self, other):
if isinstance(other, collections.Iterable):
ret = []
for elem in other:
if isinstance(elem, EnumSymbol):
elem = type_coerce(elem, DeclEnum)
ret.append(elem)
other = ret
return types.Enum.Comparator.in_(self, other)
开发者ID:unikmhz,项目名称:npui,代码行数:9,代码来源:fields.py
示例15: apply
def apply(self, q, cuts):
""" Apply a set of filters, which can be given as a set of tuples in
the form (ref, operator, value), or as a string in query form. If it
is ``None``, no filter will be applied. """
info = []
for (ref, operator, value) in self.parse(cuts):
info.append({'ref': ref, 'operator': operator, 'value': value})
table, column = self.cube.model[ref].bind(self.cube)
q = self.ensure_table(q, table)
q = q.where(column == type_coerce(value, column.type))
return info, q
开发者ID:COLABORATI,项目名称:babbage,代码行数:11,代码来源:cuts.py
示例16: delete_records_without_control_number
def delete_records_without_control_number():
"""
Find all record without a control number and delete them.
"""
# Find all records without control_number.
records = RecordMetadata.query.filter(not_(
type_coerce(RecordMetadata.json, JSONB).has_key(
'control_number'))).all()
for record in records:
_delete_record(record)
开发者ID:harunurhan,项目名称:inspire-next,代码行数:11,代码来源:delete_records.py
示例17: get_bpm_filename
def get_bpm_filename(instrument_id, ccdsum, db_address=_DEFAULT_DB):
with get_session(db_address=db_address) as db_session:
criteria = (CalibrationImage.type == 'BPM', CalibrationImage.instrument_id == instrument_id,
cast(CalibrationImage.attributes['ccdsum'], String) == type_coerce(ccdsum, JSON))
bpm_query = db_session.query(CalibrationImage).filter(*criteria)
bpm = bpm_query.order_by(desc(CalibrationImage.dateobs)).first()
if bpm is not None:
bpm_path = os.path.join(bpm.filepath, bpm.filename)
else:
bpm_path = None
return bpm_path
开发者ID:LCOGT,项目名称:banzai,代码行数:12,代码来源:dbs.py
示例18: test_ambiguous_column_by_col_plus_label
def test_ambiguous_column_by_col_plus_label(self):
users = self.tables.users
users.insert().execute(user_id=1, user_name='john')
result = select(
[users.c.user_id,
type_coerce(users.c.user_id, Integer).label('foo')]).execute()
row = result.first()
eq_(
row[users.c.user_id], 1
)
eq_(
row[1], 1
)
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:14,代码来源:test_resultset.py
示例19: list_codes
def list_codes():
if request_wants_json():
with transaction():
result = db.session.query(
Code.id, Code.value,
type_coerce(Code.user_id, db.Boolean)
).select_from(Code).outerjoin(User)
return jsonify(rows=[{
'id': code_id,
'code': value,
'requested': requested,
} for code_id, value, requested in result])
else:
return render_template('list_codes.html')
开发者ID:sebschrader,项目名称:bluemix-promocodes,代码行数:14,代码来源:__init__.py
示例20: setup_mappers
def setup_mappers(cls):
mapper(cls.classes.Person, cls.tables.person, properties=dict(
pets=relationship(
cls.classes.Pet, primaryjoin=(
orm.foreign(cls.tables.pets.c.person_id) ==
sa.cast(
sa.type_coerce(cls.tables.person.c.id, Integer),
Integer
)
)
)
))
mapper(cls.classes.Pet, cls.tables.pets)
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:14,代码来源:test_lazy_relations.py
注:本文中的sqlalchemy.type_coerce函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论