本文整理汇总了Python中sqlalchemy.orm.query.Query类的典型用法代码示例。如果您正苦于以下问题:Python Query类的具体用法?Python Query怎么用?Python Query使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Query类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, entities, session, **kw):
SaQuery.__init__(self, entities, session, **kw)
ent_cls = entities[0]
if isinstance(ent_cls, type) and issubclass(ent_cls, Entity):
self._entity_class = ent_cls
else:
self._entity_class = None
开发者ID:b8va,项目名称:everest,代码行数:7,代码来源:querying.py
示例2: __init__
def __init__ (self, entities, session):
"""Create an SCMQuery.
Parameters
----------
entities:
session:
Attributes
----------
self.start: datetime.datetime
Start of the period to consider for commits. Default: None
(start from the first commit)
self.end: datetime.datetime
End of the period to consider for commits. Default: None
(end in the last commit)
"""
self.start = None
self.end = None
# Keep an accounting of which tables have been joined, to avoid
# undesired repeated joins
self.joined = []
Query.__init__(self, entities, session)
开发者ID:jgbarah,项目名称:grimoire-api,代码行数:27,代码来源:scm_query.py
示例3: test_query_column_name
def test_query_column_name(self):
# test for bug: http://groups.google.com/group/geoalchemy/browse_thread/thread/6b731dd1673784f9
from sqlalchemy.orm.query import Query
query = Query(Road.road_geom).filter(Road.road_geom == '..').__str__()
ok_('AsBinary(roads.road_geom)' in query, 'table name is part of the column expression (select clause)')
ok_('WHERE Equals(roads.road_geom' in query, 'table name is part of the column expression (where clause)')
query_wkb = Select([Road.road_geom]).where(Road.road_geom == 'POINT(0 0)').__str__()
ok_('SELECT AsBinary(roads.road_geom)' in query_wkb, 'AsBinary is added')
ok_('WHERE Equals(roads.road_geom' in query_wkb, 'AsBinary is not added in where clause')
# test for RAW attribute
query_wkb = Select([Road.road_geom.RAW]).__str__()
ok_('SELECT roads.road_geom' in query_wkb, 'AsBinary is not added')
ok_(session.query(Road.road_geom.RAW).first())
query_srid = Query(func.SRID(Road.road_geom.RAW))
ok_('SRID(roads.road_geom)' in query_srid.__str__(), 'AsBinary is not added')
ok_(session.scalar(query_srid))
eq_(session.scalar(Select([func.SRID(Spot.spot_location)]).where(Spot.spot_id == 1)),
None,
'AsBinary is added and the SRID is not returned')
eq_(str(session.scalar(Select([func.SRID(Spot.spot_location.RAW)]).where(Spot.spot_id == 1))),
'4326',
'AsBinary is not added and the SRID is returned')
spot_alias = aliased(Spot)
query_wkt = Select([func.wkt(spot_alias.spot_location.RAW)]).__str__()
ok_('SELECT wkt(spots_1.spot_location' in query_wkt, 'Table alias is used in select clause')
ok_('FROM spots AS spots_1' in query_wkt, 'Table alias is used in from clause')
开发者ID:vuamitom,项目名称:geoalchemy,代码行数:32,代码来源:test_spatialite.py
示例4: __init__
def __init__(self, entities, session, **kw):
SaQuery.__init__(self, entities, session, **kw)
ent_cls = entities[0]
if isinstance(ent_cls, type) and issubclass(ent_cls, Entity):
self._entity_class = ent_cls
else: # just for compatibility pragma: no cover
self._entity_class = None
开发者ID:helixyte,项目名称:everest,代码行数:7,代码来源:querying.py
示例5: __init__
def __init__ (self, entities, session):
"""Create a GrimoreQuery.
Parameters
----------
entities: list of SQLAlchemy entities
Entities (tables) to include in the query
session: SQLAlchemy session
SQLAlchemy session to use to connect to the database
Attributes
----------
self.start: datetime.datetime
Start of the period to consider for commits. Default: None
(start from the first commit)
self.end: datetime.datetime
End of the period to consider for commits. Default: None
(end in the last commit)
"""
self.start = None
self.end = None
# Keep an accounting of which tables have been joined, to avoid
# undesired repeated joins
self.joined = []
Query.__init__(self, entities, session)
开发者ID:VizGrimoire,项目名称:GrimoireLib,代码行数:29,代码来源:common.py
示例6: get
def get(self, ident):
prop = self.property_name
if self._criterion:
mapper = self._only_full_mapper_zero("get")
# Don't use getattr/hasattr to check public existence, since this
# might misinterpret a bug (AttributeError raised by some code in
# property implementation) as missing attribute and cause all
# private data going to public.
if prop in dir(mapper.class_):
crit = getattr(mapper.class_, prop)
if crit is not None:
if not isinstance(crit, ClauseElement):
# This simplest safe way to make bare boolean column
# accepted as expression.
crit = cast(crit, Boolean)
if crit!=self._criterion:
# We can't verify that criterion is from our private()
# call. Check from DB instead of looking in identity
# map.
assert False # XXX temporal to verify it's used
return Query.get(self.populate_existing(), ident)
assert False # XXX temporal to verify it's used
obj = Query.get(self, ident)
if obj is not None and (prop not in dir(obj) or getattr(obj, prop)):
return obj
开发者ID:Lehych,项目名称:iktomi,代码行数:25,代码来源:public_query.py
示例7: element_atomic_weight
def element_atomic_weight(self, zeq, reference=None):
z = self._get_z(zeq)
q = Query(ElementAtomicWeightProperty.value)
q = q.join(Element)
q = q.filter(Element.z == z)
exception = ValueError('Unknown atomic weight for z="{0}" and '
'reference="{1}"'.format(z, reference))
return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:8,代码来源:data.py
示例8: element_mass_density_kg_per_m3
def element_mass_density_kg_per_m3(self, zeq, reference=None):
z = self._get_z(zeq)
q = Query(ElementMassDensityProperty.value_kg_per_m3)
q = q.join(Element)
q = q.filter(Element.z == z)
exception = ValueError('Unknown mass density for z="{0}" and '
'reference="{1}"'.format(z, reference))
return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:8,代码来源:data.py
示例9: __init__
def __init__ (self, entities, session):
"""Initialize the object
self.start and self.end will be used in case there are temporal
limits for the query (useful to produce TimeSeries objects,
which needs those.
"""
self.start = None
self.end = None
Query.__init__(self, entities, session)
开发者ID:virusu,项目名称:PFC_old,代码行数:11,代码来源:scm_query.py
示例10: element_name
def element_name(self, zeq, language='en', reference=None):
z = self._get_z(zeq)
q = Query(ElementNameProperty.name)
q = q.filter(ElementNameProperty.language_code == language)
q = q.join(Element)
q = q.filter(Element.z == z)
exception = ValueError('Unknown name for z="{0}", '
'language="{1}" and '
'reference="{2}"'
.format(z, language, reference))
return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:11,代码来源:data.py
示例11: test_underscore_update_many_query
def test_underscore_update_many_query(self, mock_clean):
from sqlalchemy.orm.query import Query
items = Query('asd')
clean_items = Query("ASD")
clean_items.all = Mock(return_value=[1, 2, 3])
clean_items.update = Mock()
mock_clean.return_value = clean_items
count = docs.BaseMixin._update_many(items, {'foo': 'bar'})
mock_clean.assert_called_once_with(items)
clean_items.update.assert_called_once_with(
{'foo': 'bar'}, synchronize_session='fetch')
assert count == clean_items.update()
开发者ID:numb3r3,项目名称:nefertari-sqla,代码行数:12,代码来源:test_documents.py
示例12: __iter__
def __iter__(self):
"""override __iter__ to pull results from Beaker
if particular attributes have been configured.
"""
if hasattr(self, '_cache_parameters'):
cache, cache_key = _get_cache_parameters(self)
ret = cache.get_value(cache_key, createfunc=lambda: list(Query.__iter__(self)))
# merge the result in.
return self.merge_result(ret, load=False)
else:
return Query.__iter__(self)
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:12,代码来源:meta.py
示例13: build_entity_query
def build_entity_query(self):
"""
Builds a :class:`sqla:sqlalchemy.orm.query.Query` object for this
entity (an instance of :class:`sir.schema.searchentities.SearchEntity`)
that eagerly loads the values of all search fields.
:rtype: :class:`sqla:sqlalchemy.orm.query.Query`
"""
root_model = self.model
query = Query(root_model)
paths = [field.paths for field in self.fields]
if (config.CFG.getboolean("sir", "wscompat")
and self.extrapaths is not None):
paths.extend([self.extrapaths])
merged_paths = merge_paths(paths)
for field_paths in paths:
for path in field_paths:
current_merged_path = merged_paths
model = root_model
load = Load(model)
split_path = path.split(".")
for pathelem in split_path:
current_merged_path = current_merged_path[pathelem]
column = getattr(model, pathelem)
prop = column.property
if isinstance(prop, RelationshipProperty):
pk = column.mapper.primary_key[0].name
if prop.direction == ONETOMANY:
load = load.subqueryload(pathelem)
elif prop.direction == MANYTOONE:
load = load.joinedload(pathelem)
else:
load = load.defaultload(pathelem)
required_columns = current_merged_path.keys()
required_columns.append(pk)
# Get the mapper class of the current element of the path so
# the next iteration can access it.
model = prop.mapper.class_
logger.debug("Loading only %s on %s",
required_columns,
model)
load = defer_everything_but(class_mapper(model),
load,
*required_columns)
query = query.options(load)
return query
开发者ID:ianmcorvidae,项目名称:sir,代码行数:51,代码来源:searchentities.py
示例14: __iter__
def __iter__(self):
if hasattr(self, 'cachekey'):
try:
ret = _cache[self.cachekey]
except KeyError:
ret = list(Query.__iter__(self))
for x in ret:
self.session.expunge(x)
_cache[self.cachekey] = ret
return iter(self.session.merge(x, dont_load=True) for x in ret)
else:
return Query.__iter__(self)
开发者ID:gajop,项目名称:ailadder,代码行数:14,代码来源:query_caching.py
示例15: __init__
def __init__(self, attr, state):
Query.__init__(self, attr.target_mapper, None)
self.instance = instance = state.obj()
self.attr = attr
mapper = object_mapper(instance)
prop = mapper.get_property(self.attr.key, resolve_synonyms=True)
self._criterion = prop.compare(
operators.eq,
instance,
value_is_parent=True,
alias_secondary=False)
if self.attr.order_by:
self._order_by = self.attr.order_by
开发者ID:dreamwave,项目名称:rad,代码行数:15,代码来源:dynamic.py
示例16: test_underscore_delete_many_query
def test_underscore_delete_many_query(
self, mock_clean, mock_on_bulk):
from sqlalchemy.orm.query import Query
items = Query('asd')
clean_items = Query("ASD")
clean_items.all = Mock(return_value=[1, 2, 3])
clean_items.delete = Mock()
mock_clean.return_value = clean_items
count = docs.BaseMixin._delete_many(items)
mock_clean.assert_called_once_with(items)
clean_items.delete.assert_called_once_with(
synchronize_session=False)
mock_on_bulk.assert_called_once_with(
docs.BaseMixin, [1, 2, 3], None)
assert count == clean_items.delete()
开发者ID:numb3r3,项目名称:nefertari-sqla,代码行数:15,代码来源:test_documents.py
示例17: __iter__
def __iter__(self):
if hasattr(self, 'cachekey'):
try:
cache = self.session._cache
except AttributeError:
self.session._cache = cache = {}
try:
ret = cache[self.cachekey]
except KeyError:
ret = list(Query.__iter__(self))
cache[self.cachekey] = ret
return iter(ret)
else:
return Query.__iter__(self)
开发者ID:gajop,项目名称:ailadder,代码行数:17,代码来源:per_session.py
示例18: __iter__
def __iter__(self):
log.info("Query:\n\t%s" % unicode(self).replace("\n", "\n\t"))
explain = self.session.execute(Explain(self)).fetchall()
text = "\n\t".join("|".join(str(x) for x in line) for line in explain)
before = time()
result = Query.__iter__(self)
log.info("Query Time: %0.3f Explain Query Plan:\n\t%s" % (time() - before, text))
return result
开发者ID:BrainDamage,项目名称:Flexget,代码行数:8,代码来源:explain_sql.py
示例19: __iter__
def __iter__(self):
"""override __iter__ to pull results from dogpile
if particular attributes have been configured.
Note that this approach does *not* detach the loaded objects from
the current session. If the cache backend is an in-process cache
(like "memory") and lives beyond the scope of the current session's
transaction, those objects may be expired. The method here can be
modified to first expunge() each loaded item from the current
session before returning the list of items, so that the items
in the cache are not the same ones in the current Session.
"""
if hasattr(self, '_cache_region'):
return self.get_value(createfunc=lambda: list(Query.__iter__(self)))
else:
return Query.__iter__(self)
开发者ID:wangzhengbo1204,项目名称:Python,代码行数:17,代码来源:caching_query.py
示例20: filter_by
def filter_by(me, **kargs):
""" A.query().filter_by( x=4, **{'b.c.d':5, ...}) """
r = me
for k, v in kargs.iteritems():
attrs = k.split(".")
if attrs[:-1]:
r = r.join(attrs[:-1])
r = _saQuery.filter_by(r, **{attrs[-1]: v})
r = r.reset_joinpoint()
return r
开发者ID:hstanev,项目名称:dbcook,代码行数:10,代码来源:query.py
注:本文中的sqlalchemy.orm.query.Query类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论