• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python query.Query类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.query.Query的典型用法代码示例。如果您正苦于以下问题:Python Query类的具体用法?Python Query怎么用?Python Query使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Query类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

 def __init__(self, entities, session, **kw):
     SaQuery.__init__(self, entities, session, **kw)
     ent_cls = entities[0]
     if isinstance(ent_cls, type) and issubclass(ent_cls, Entity):
         self._entity_class = ent_cls
     else:
         self._entity_class = None
开发者ID:b8va,项目名称:everest,代码行数:7,代码来源:querying.py


示例2: __init__

    def __init__ (self, entities, session):
        """Create an SCMQuery.

        Parameters
        ----------

        entities:
        session:

        Attributes
        ----------

        self.start: datetime.datetime
           Start of the period to consider for commits. Default: None
           (start from the first commit)
        self.end: datetime.datetime
           End of the period to consider for commits. Default: None
           (end in the last commit)

        """

        self.start = None
        self.end = None
        # Keep an accounting of which tables have been joined, to avoid
        # undesired repeated joins
        self.joined = []
        Query.__init__(self, entities, session)
开发者ID:jgbarah,项目名称:grimoire-api,代码行数:27,代码来源:scm_query.py


示例3: test_query_column_name

    def test_query_column_name(self):
        # test for bug: http://groups.google.com/group/geoalchemy/browse_thread/thread/6b731dd1673784f9
        from sqlalchemy.orm.query import Query
        query = Query(Road.road_geom).filter(Road.road_geom == '..').__str__()
        ok_('AsBinary(roads.road_geom)' in query, 'table name is part of the column expression (select clause)')
        ok_('WHERE Equals(roads.road_geom' in query, 'table name is part of the column expression (where clause)')

        query_wkb = Select([Road.road_geom]).where(Road.road_geom == 'POINT(0 0)').__str__()
        ok_('SELECT AsBinary(roads.road_geom)' in query_wkb, 'AsBinary is added')
        ok_('WHERE Equals(roads.road_geom' in query_wkb, 'AsBinary is not added in where clause')

        # test for RAW attribute
        query_wkb = Select([Road.road_geom.RAW]).__str__()
        ok_('SELECT roads.road_geom' in query_wkb, 'AsBinary is not added')

        ok_(session.query(Road.road_geom.RAW).first())

        query_srid = Query(func.SRID(Road.road_geom.RAW))
        ok_('SRID(roads.road_geom)' in query_srid.__str__(), 'AsBinary is not added')
        ok_(session.scalar(query_srid))

        eq_(session.scalar(Select([func.SRID(Spot.spot_location)]).where(Spot.spot_id == 1)),
                None,
                'AsBinary is added and the SRID is not returned')
        eq_(str(session.scalar(Select([func.SRID(Spot.spot_location.RAW)]).where(Spot.spot_id == 1))),
                '4326',
                'AsBinary is not added and the SRID is returned')

        spot_alias = aliased(Spot)
        query_wkt = Select([func.wkt(spot_alias.spot_location.RAW)]).__str__()
        ok_('SELECT wkt(spots_1.spot_location' in query_wkt, 'Table alias is used in select clause')
        ok_('FROM spots AS spots_1' in query_wkt, 'Table alias is used in from clause')
开发者ID:vuamitom,项目名称:geoalchemy,代码行数:32,代码来源:test_spatialite.py


示例4: __init__

 def __init__(self, entities, session, **kw):
     SaQuery.__init__(self, entities, session, **kw)
     ent_cls = entities[0]
     if isinstance(ent_cls, type) and issubclass(ent_cls, Entity):
         self._entity_class = ent_cls
     else: # just for compatibility pragma: no cover
         self._entity_class = None
开发者ID:helixyte,项目名称:everest,代码行数:7,代码来源:querying.py


示例5: __init__

    def __init__ (self, entities, session):
        """Create a GrimoreQuery.

        Parameters
        ----------

        entities: list of SQLAlchemy entities
           Entities (tables) to include in the query
        session: SQLAlchemy session
           SQLAlchemy session to use to connect to the database

        Attributes
        ----------

        self.start: datetime.datetime
           Start of the period to consider for commits. Default: None
           (start from the first commit)
        self.end: datetime.datetime
           End of the period to consider for commits. Default: None
           (end in the last commit)

        """

        self.start = None
        self.end = None
        # Keep an accounting of which tables have been joined, to avoid
        # undesired repeated joins
        self.joined = []
        Query.__init__(self, entities, session)
开发者ID:VizGrimoire,项目名称:GrimoireLib,代码行数:29,代码来源:common.py


示例6: get

 def get(self, ident):
     prop = self.property_name
     if self._criterion:
         mapper = self._only_full_mapper_zero("get")
         # Don't use getattr/hasattr to check public existence, since this
         # might misinterpret a bug (AttributeError raised by some code in
         # property implementation) as missing attribute and cause all
         # private data going to public.
         if prop in dir(mapper.class_):
             crit = getattr(mapper.class_, prop)
             if crit is not None:
                 if not isinstance(crit, ClauseElement):
                     # This simplest safe way to make bare boolean column
                     # accepted as expression.
                     crit = cast(crit, Boolean)
                 if crit!=self._criterion:
                     # We can't verify that criterion is from our private()
                     # call.  Check from DB instead of looking in identity
                     # map.
                     assert False # XXX temporal to verify it's used
                     return Query.get(self.populate_existing(), ident)
         assert False # XXX temporal to verify it's used
     obj = Query.get(self, ident)
     if obj is not None and (prop not in dir(obj) or getattr(obj, prop)):
         return obj
开发者ID:Lehych,项目名称:iktomi,代码行数:25,代码来源:public_query.py


示例7: element_atomic_weight

 def element_atomic_weight(self, zeq, reference=None):
     z = self._get_z(zeq)
     q = Query(ElementAtomicWeightProperty.value)
     q = q.join(Element)
     q = q.filter(Element.z == z)
     exception = ValueError('Unknown atomic weight for z="{0}" and '
                             'reference="{1}"'.format(z, reference))
     return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:8,代码来源:data.py


示例8: element_mass_density_kg_per_m3

 def element_mass_density_kg_per_m3(self, zeq, reference=None):
     z = self._get_z(zeq)
     q = Query(ElementMassDensityProperty.value_kg_per_m3)
     q = q.join(Element)
     q = q.filter(Element.z == z)
     exception = ValueError('Unknown mass density for z="{0}" and '
                             'reference="{1}"'.format(z, reference))
     return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:8,代码来源:data.py


示例9: __init__

    def __init__ (self, entities, session):
        """Initialize the object
        
        self.start and self.end will be used in case there are temporal
        limits for the query (useful to produce TimeSeries objects,
        which needs those.
        """

        self.start = None
        self.end = None
        Query.__init__(self, entities, session)
开发者ID:virusu,项目名称:PFC_old,代码行数:11,代码来源:scm_query.py


示例10: element_name

 def element_name(self, zeq, language='en', reference=None):
     z = self._get_z(zeq)
     q = Query(ElementNameProperty.name)
     q = q.filter(ElementNameProperty.language_code == language)
     q = q.join(Element)
     q = q.filter(Element.z == z)
     exception = ValueError('Unknown name for z="{0}", '
                            'language="{1}" and '
                             'reference="{2}"'
                             .format(z, language, reference))
     return self._query_with_references(q, exception, reference)
开发者ID:fingenerf,项目名称:pyxray,代码行数:11,代码来源:data.py


示例11: test_underscore_update_many_query

 def test_underscore_update_many_query(self, mock_clean):
     from sqlalchemy.orm.query import Query
     items = Query('asd')
     clean_items = Query("ASD")
     clean_items.all = Mock(return_value=[1, 2, 3])
     clean_items.update = Mock()
     mock_clean.return_value = clean_items
     count = docs.BaseMixin._update_many(items, {'foo': 'bar'})
     mock_clean.assert_called_once_with(items)
     clean_items.update.assert_called_once_with(
         {'foo': 'bar'}, synchronize_session='fetch')
     assert count == clean_items.update()
开发者ID:numb3r3,项目名称:nefertari-sqla,代码行数:12,代码来源:test_documents.py


示例12: __iter__

 def __iter__(self):
     """override __iter__ to pull results from Beaker
        if particular attributes have been configured.
     """
     if hasattr(self, '_cache_parameters'):
         cache, cache_key = _get_cache_parameters(self)
         ret = cache.get_value(cache_key, createfunc=lambda: list(Query.__iter__(self)))
         
         # merge the result in.  
         return self.merge_result(ret, load=False)
     else:
         return Query.__iter__(self)
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:12,代码来源:meta.py


示例13: build_entity_query

    def build_entity_query(self):
        """
        Builds a :class:`sqla:sqlalchemy.orm.query.Query` object for this
        entity (an instance of :class:`sir.schema.searchentities.SearchEntity`)
        that eagerly loads the values of all search fields.

        :rtype: :class:`sqla:sqlalchemy.orm.query.Query`
        """
        root_model = self.model
        query = Query(root_model)
        paths = [field.paths for field in self.fields]

        if (config.CFG.getboolean("sir", "wscompat")
            and self.extrapaths is not None):
            paths.extend([self.extrapaths])

        merged_paths = merge_paths(paths)

        for field_paths in paths:
            for path in field_paths:
                current_merged_path = merged_paths
                model = root_model
                load = Load(model)
                split_path = path.split(".")
                for pathelem in split_path:
                    current_merged_path = current_merged_path[pathelem]
                    column = getattr(model, pathelem)
                    prop = column.property

                    if isinstance(prop, RelationshipProperty):
                        pk = column.mapper.primary_key[0].name
                        if prop.direction == ONETOMANY:
                            load = load.subqueryload(pathelem)
                        elif prop.direction == MANYTOONE:
                            load = load.joinedload(pathelem)
                        else:
                            load = load.defaultload(pathelem)
                        required_columns = current_merged_path.keys()
                        required_columns.append(pk)
                        # Get the mapper class of the current element of the path so
                        # the next iteration can access it.
                        model = prop.mapper.class_

                        logger.debug("Loading only %s on %s",
                                     required_columns,
                                     model)
                        load = defer_everything_but(class_mapper(model),
                                                    load,
                                                    *required_columns)
                query = query.options(load)
        return query
开发者ID:ianmcorvidae,项目名称:sir,代码行数:51,代码来源:searchentities.py


示例14: __iter__

    def __iter__(self):
        if hasattr(self, 'cachekey'):
            try:
                ret = _cache[self.cachekey]
            except KeyError:
                ret = list(Query.__iter__(self))
                for x in ret:
                    self.session.expunge(x)
                _cache[self.cachekey] = ret

            return iter(self.session.merge(x, dont_load=True) for x in ret)

        else:
            return Query.__iter__(self)
开发者ID:gajop,项目名称:ailadder,代码行数:14,代码来源:query_caching.py


示例15: __init__

    def __init__(self, attr, state):
        Query.__init__(self, attr.target_mapper, None)
        self.instance = instance = state.obj()
        self.attr = attr

        mapper = object_mapper(instance)
        prop = mapper.get_property(self.attr.key, resolve_synonyms=True)
        self._criterion = prop.compare(
                            operators.eq, 
                            instance, 
                            value_is_parent=True, 
                            alias_secondary=False)

        if self.attr.order_by:
            self._order_by = self.attr.order_by
开发者ID:dreamwave,项目名称:rad,代码行数:15,代码来源:dynamic.py


示例16: test_underscore_delete_many_query

 def test_underscore_delete_many_query(
         self, mock_clean, mock_on_bulk):
     from sqlalchemy.orm.query import Query
     items = Query('asd')
     clean_items = Query("ASD")
     clean_items.all = Mock(return_value=[1, 2, 3])
     clean_items.delete = Mock()
     mock_clean.return_value = clean_items
     count = docs.BaseMixin._delete_many(items)
     mock_clean.assert_called_once_with(items)
     clean_items.delete.assert_called_once_with(
         synchronize_session=False)
     mock_on_bulk.assert_called_once_with(
         docs.BaseMixin, [1, 2, 3], None)
     assert count == clean_items.delete()
开发者ID:numb3r3,项目名称:nefertari-sqla,代码行数:15,代码来源:test_documents.py


示例17: __iter__

    def __iter__(self):
        if hasattr(self, 'cachekey'):
            try:
                cache = self.session._cache
            except AttributeError:
                self.session._cache = cache = {}
                
            try:
                ret = cache[self.cachekey]
            except KeyError:
                ret = list(Query.__iter__(self))
                cache[self.cachekey] = ret

            return iter(ret)

        else:
            return Query.__iter__(self)
开发者ID:gajop,项目名称:ailadder,代码行数:17,代码来源:per_session.py


示例18: __iter__

 def __iter__(self):
     log.info("Query:\n\t%s" % unicode(self).replace("\n", "\n\t"))
     explain = self.session.execute(Explain(self)).fetchall()
     text = "\n\t".join("|".join(str(x) for x in line) for line in explain)
     before = time()
     result = Query.__iter__(self)
     log.info("Query Time: %0.3f Explain Query Plan:\n\t%s" % (time() - before, text))
     return result
开发者ID:BrainDamage,项目名称:Flexget,代码行数:8,代码来源:explain_sql.py


示例19: __iter__

    def __iter__(self):
        """override __iter__ to pull results from dogpile
           if particular attributes have been configured.

           Note that this approach does *not* detach the loaded objects from
           the current session. If the cache backend is an in-process cache
           (like "memory") and lives beyond the scope of the current session's
           transaction, those objects may be expired. The method here can be
           modified to first expunge() each loaded item from the current
           session before returning the list of items, so that the items
           in the cache are not the same ones in the current Session.

        """
        if hasattr(self, '_cache_region'):
            return self.get_value(createfunc=lambda: list(Query.__iter__(self)))
        else:
            return Query.__iter__(self)
开发者ID:wangzhengbo1204,项目名称:Python,代码行数:17,代码来源:caching_query.py


示例20: filter_by

 def filter_by(me, **kargs):
     """ A.query().filter_by( x=4, **{'b.c.d':5, ...}) """
     r = me
     for k, v in kargs.iteritems():
         attrs = k.split(".")
         if attrs[:-1]:
             r = r.join(attrs[:-1])
         r = _saQuery.filter_by(r, **{attrs[-1]: v})
         r = r.reset_joinpoint()
     return r
开发者ID:hstanev,项目名称:dbcook,代码行数:10,代码来源:query.py



注:本文中的sqlalchemy.orm.query.Query类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python scoping.scoped_session函数代码示例发布时间:2022-05-27
下一篇:
Python path_registry.PathRegistry类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap