• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.scoped_session函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.scoped_session函数的典型用法代码示例。如果您正苦于以下问题:Python scoped_session函数的具体用法?Python scoped_session怎么用?Python scoped_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了scoped_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_session

def get_session():
	"""Creates a new database session for instant use"""

	engine = create_engine(settings.DATASTORE_URI)
	Session = sessionmaker(bind = engine)
	scoped_session(Session)
	return (Session(), engine)
开发者ID:rzetterberg,项目名称:picshu,代码行数:7,代码来源:database.py


示例2: autoupdate

def autoupdate():
    """
    Autoupdate data layer about new models in engines.
    """
    scoped_session(Backend.instance().get_sessionmaker)
    Backend.instance().get_base().metadata.create_all(
        Backend.instance().get_engine())
开发者ID:speedingdeer,项目名称:real_time_massages_service,代码行数:7,代码来源:autoupdate.py


示例3: __init__

    def __init__(self, uri=None, engine=None, **kwargs):
        """Create a new SqlAlchemyStore.  When given a connection string (uri)
        or SQLAlchemy Engine (engine), this Store will create it's own internal
        SQLAlchemy Session to manage objects.  If you do not provide a URI or
        Engine, your mapped object metadata must be bound to their own engines.
        """

        super(SqlAlchemyStore, self).__init__()
        
        # get the session
        if "session" in kwargs:
            # we no longer allow initialization with a pre-existing session object, there
            # are too many issues with this approach at the moment
            raise DeprecationWarning("cannot instantiate a SqlAlchemyStore with a pre-existing session object")
        else:
            # no session, we have to make one
            # first we need to get the engine
            
            if uri and engine:
                # can't give two ways to get an engine
                raise ValueError("you can only provide either a connection string URI or an engine, not both")
                
            elif uri:
                # we have a uri to create an engine
                engine = create_engine(uri)
            
            if engine:
                # we have an engine, we can create the bound session now
                self.__session = scoped_session(sessionmaker(autoflush=True, bind=engine))
            
            else:
                # no engine or URI was specified, create an unbound session
                # (mapped object metadata will need to be bound to an engine in this case)
                self.__session = scoped_session(sessionmaker(autoflush=True))
开发者ID:mjpizz,项目名称:stockpyle,代码行数:34,代码来源:_sqlalchemy.py


示例4: create_security_group_rule_bulk_native

    def create_security_group_rule_bulk_native(self, context,
                                               security_group_rule):
        r = security_group_rule['security_group_rules']

        scoped_session(context.session)
        security_group_id = self._validate_security_group_rules(
            context, security_group_rule)
        with context.session.begin(subtransactions=True):
            if not self.get_security_group(context, security_group_id):
                raise ext_sg.SecurityGroupNotFound(id=security_group_id)

            self._check_for_duplicate_rules(context, r)
            ret = []
            for rule_dict in r:
                rule = rule_dict['security_group_rule']
                tenant_id = self._get_tenant_id_for_create(context, rule)
                db = SecurityGroupRule(
                    id=uuidutils.generate_uuid(), tenant_id=tenant_id,
                    security_group_id=rule['security_group_id'],
                    direction=rule['direction'],
                    remote_group_id=rule.get('remote_group_id'),
                    ethertype=rule['ethertype'],
                    protocol=rule['protocol'],
                    port_range_min=rule['port_range_min'],
                    port_range_max=rule['port_range_max'],
                    remote_ip_prefix=rule.get('remote_ip_prefix'))
                context.session.add(db)
            ret.append(self._make_security_group_rule_dict(db))
        return ret
开发者ID:kongseokhwan,项目名称:kulcloud-neutron,代码行数:29,代码来源:securitygroups_db.py


示例5: __init__

    def __init__(self, bus, connection_string):
        """
        The plugin is registered to the CherryPy engine and therefore
        is part of the bus (the engine *is* a bus) registery.

        We use this plugin to create the SA engine. At the same time,
        when the plugin starts we create the tables into the database
        using the mapped class of the global metadata.
        """
        plugins.SimplePlugin.__init__(self, bus)

#        self.sa_engine = None
#        self.connection_string = connection_string
#        self.session = scoped_session(sessionmaker(autoflush=True, autocommit=False))

        self.sa_engine_online = None
        self.connection_string_online = connection_string.connectUrlonline
        self.session_online = scoped_session(sessionmaker(autoflush=True,
                                                   autocommit=False))

        self.sa_engine_offline = None
        self.connection_string_offline = connection_string.connectUrloffline
        self.session_offline = scoped_session(sessionmaker(autoflush=True,
                                                   autocommit=False))

        self.sa_engine_cache = None
        self.connection_string_cache = connection_string.connectUrlcache
        self.session_cache = scoped_session(sessionmaker(autoflush=True,
                                                   autocommit=False))
开发者ID:cms-sw,项目名称:web-confdb,代码行数:29,代码来源:saplugin.py


示例6: test_default_constructor_state_not_shared

    def test_default_constructor_state_not_shared(self):
        scope = scoped_session(sa.orm.sessionmaker())

        class A(object):
            pass
        class B(object):
            def __init__(self):
                pass

        scope.mapper(A, table1)
        scope.mapper(B, table2)

        A(foo='bar')
        assert_raises(TypeError, B, foo='bar')

        scope = scoped_session(sa.orm.sessionmaker())

        class C(object):
            def __init__(self):
                pass
        class D(object):
            pass

        scope.mapper(C, table1)
        scope.mapper(D, table2)

        assert_raises(TypeError, C, foo='bar')
        D(foo='bar')
开发者ID:gajop,项目名称:springgrid,代码行数:28,代码来源:test_scoping.py


示例7: test_all_htmlpopups

 def test_all_htmlpopups(self):
     from chsdi.models import models_from_name
     from chsdi.models.bod import LayersConfig
     from sqlalchemy import distinct
     from sqlalchemy.orm import scoped_session, sessionmaker
     val = True
     DBSession = scoped_session(sessionmaker())
     query = DBSession.query(distinct(LayersConfig.idBod)).filter(LayersConfig.queryable == val).filter(LayersConfig.staging == 'prod')
     # Get a list of all the queryable layers
     layers = [q[0] for q in query]
     DBSession.close()
     # Get a list of feature ids
     features = []
     for layer in layers:
         try:
             model = models_from_name(layer)[0]
             DBSession = scoped_session(sessionmaker())
             query = DBSession.query(model.primary_key_column()).limit(1)
             ID = [q[0] for q in query]
             # If tables are empty ID is an empty list
             if ID:
                 features.append((layer, str(ID[0])))
             DBSession.close()
         except Exception as e:
             print e
         finally:
             DBSession.close()
     for f in features:
         for lang in ('de', 'fr', 'it', 'rm', 'en'):
             self.testapp.get('/rest/services/all/MapServer/%s/%s/htmlPopup' % (f[0], f[1]), params={'callback': 'cb', 'lang': '%s' % lang}, status=200)
开发者ID:jesseeichar,项目名称:mf-chsdi3,代码行数:30,代码来源:test_mapservice.py


示例8: __init__

    def __init__(self, name=None):
        if not name:
            name = 'default'
        data = {}
        if hasattr(settings, 'DATABASES') and \
           name in settings.DATABASES:
            data = settings.DATABASES[name]
        if data.get('ENGINE', '') == '':
            raise ImproperlyConfigured('''\
The database ORM connection requires, at minimum, an engine type.''')
        if '.' in data['ENGINE']:
            data['ENGINE'] = data['ENGINE'].rsplit('.', 1)[-1]

        # django needs sqlite3 but sqlalchemy references sqlite
        if data['ENGINE'] == 'sqlite3':
            data['ENGINE'] = 'sqlite'

        self.engine = self._create_engine(data)
        ro_values = dict([(k[9:], v) for k, v in data.iteritems()
                             if k.startswith('READONLY_')])
        if len(ro_values):
            ro_data = dict(data)
            ro_data.update(ro_values)
            self.readonly_engine = self._create_engine(ro_data)
        self.metadata = MetaData(self.engine)
        self.Base = declarative_base(metadata=self.metadata)
        if hasattr(self, 'readonly_engine'):
            self._readonly_sessionmaker = \
                scoped_session(sessionmaker(bind=self.readonly_engine))
        self._sessionmaker = scoped_session(sessionmaker(bind=self.engine))
开发者ID:saebyn,项目名称:baph,代码行数:30,代码来源:orm.py


示例9: init_sqlalchemy

def init_sqlalchemy(settings):
    # master
    master_url = settings['sqlalchemy.url']
    connect_kwargs = settings.get('sqlalchemy.connect_kwargs')
    kwargs = {}
    if connect_kwargs is not None:
        if isinstance(connect_kwargs, str):
            connect_kwargs = json.loads(connect_kwargs)
        for k, v in connect_kwargs.items():
            kwargs[k] = v
    engine = create_engine(master_url, **kwargs)

    sm = orm.sessionmaker(bind=engine, extension=ZopeTransactionExtension())
    meta.Session = orm.scoped_session(sm)
    meta.metadata.bind = engine

    # slaves
    slaves_url = settings.get('sqlalchemy.slaves', [])
    slaves = []
    for url in slaves_url:
        slave = create_engine(url, **kwargs)
        sm = orm.sessionmaker(bind=slave, extension=ZopeTransactionExtension())
        slaves.append(orm.scoped_session(sm))

    if slaves:
        slave = random.choice(slaves)
        meta.BaseObject.query = slave.query_property(orm.Query)
    else:
        meta.BaseObject.query = meta.Session.query_property(orm.Query)
开发者ID:lxneng,项目名称:easy_sqlalchemy,代码行数:29,代码来源:__init__.py


示例10: test_all_htmlpopups

    def test_all_htmlpopups(self):
        from chsdi.models import models_from_name
        from chsdi.models.bod import LayersConfig
        from sqlalchemy import distinct
        from sqlalchemy.orm import scoped_session, sessionmaker
        val = True
        DBSession = scoped_session(sessionmaker())
        valnone = None
        query = DBSession.query(distinct(LayersConfig.layerBodId)).filter(LayersConfig.staging == 'prod').filter(LayersConfig.queryable == val).filter(LayersConfig.parentLayerId == valnone)
        features = []
        try:
            for layer in getLayers(query):
                try:
                    FeatDBSession = scoped_session(sessionmaker())
                    models = models_from_name(layer)
                    self.failUnless(models is not None and len(models) > 0, layer)
                    model = models[0]
                    query = FeatDBSession.query(model.primary_key_column()).limit(1)
                    ID = [q[0] for q in query]
                    if ID:
                        features.append((layer, str(ID[0])))
                finally:
                    FeatDBSession.close()
        finally:
            DBSession.close()

        for f in features:
            for lang in ('de', 'fr', 'it', 'rm', 'en'):
                link = '/rest/services/all/MapServer/' + f[0] + '/' + f[1] + '/htmlPopup?callback=cb&lang=' + lang
                resp = self.testapp.get(link)
                self.failUnless(resp.status_int == 200, link)
开发者ID:qbns,项目名称:mf-chsdi3,代码行数:31,代码来源:test_mapservice.py


示例11: connect

def connect(dbstr):
  engine = create_engine(dbstr, echo = False)
  connection = engine.connect()

  #handle case that the db was initialized before a version table existed yet
  if engine.dialect.has_table(connection, "program"):
    # if there are existing tables
    if not engine.dialect.has_table(connection, "_meta"):
      # if no version table, assume outdated db version and error
      connection.close()
      raise Exception("Your opentuner database is currently out of date. Save a back up and reinitialize")

  # else if we have the table already, make sure version matches
  if engine.dialect.has_table(connection, "_meta"):
    Session = scoped_session(sessionmaker(autocommit=False,
                                          autoflush=False,
                                          bind=engine))
    version = _Meta.get_version(Session)
    if not DB_VERSION == version:
      raise Exception('Your opentuner database version {} is out of date with the current version {}'.format(version, DB_VERSION))

  Base.metadata.create_all(engine)

  Session = scoped_session(sessionmaker(autocommit=False,
                                        autoflush=False,
                                        bind=engine))
  # mark database with current version
  _Meta.add_version(Session, DB_VERSION)
  Session.commit()

  return engine, Session
开发者ID:jansel,项目名称:opentuner,代码行数:31,代码来源:connect.py


示例12: init_rdbms

def init_rdbms(engine):
    """Call me before using any RDBMSStore class in the model."""
    sm = orm.sessionmaker(autocommit=True, bind=engine)
    rdbms.sqlEngine = engine
    rdbms.sqlSession = orm.scoped_session(sm)
    sqlEngine = engine
    sqlSession = orm.scoped_session(sm)
    assert(sqlEngine)
    assert(sqlSession)
开发者ID:junkafarian,项目名称:opencore,代码行数:9,代码来源:__init__.py


示例13: create_group

def create_group():
    '''
    Endpoint that allows the user to generate a group. GET request generates
    the form required to create the group. POST request makes the DB calls to
    create the group. 
    '''
    global USE_PROTOBUFF 
    if request.method == 'POST':

        if USE_PROTOBUFF:
            GrpCreateClient = message_pb2.UsrGrp()
            result = base64.b64decode(request.form['protoString'])
            GrpCreateClient.ParseFromString(result)

            name = GrpCreateClient.name
            user_ids = GrpCreateClient.user_ids
        else:
            name = request.form['name']
            user_ids = request.form['user_ids']
       
        #parse string of users into integer array
        user_ids = user_ids.split(',')
        user_ids = [int(user_id) for user_id in user_ids]
        user_ids.append(int(current_user.id))
        user_ids.sort()

        #make sure that the group has more than 2 people
        if len(user_ids) <= 2:
            flash('Groups must be made with more than 2 people')
            return json.dumps({}, 403, {'ContentType':'application/json'})
        
        user_ids = str(user_ids)

        #make sure the group has not been created yet
        Session = scoped_session(sessionmaker(bind=engine))
        s = Session()
        result_proxy = s.execute('SELECT * FROM groups WHERE user_ids = "' + user_ids + '" LIMIT 1')
        s.close()
        result = result_proxy.fetchone()
        if result is not None:
            flash('That group has already been created')
            return json.dumps({}, 403, {'ContentType':'application/json'})
        
        #create group
        try:
            Session = scoped_session(sessionmaker(bind=engine))
            s = Session()
            s.execute('INSERT INTO groups(groupname, user_ids) VALUES ("' + name + '","' + user_ids + '")') 
            s.commit()
            s.close()
            return json.dumps({}, 200, {'ContentType':'application/json'})
        except:
            return json.dumps({}, 403, {'ContentType':'application/json'})
    else:
        return render_template(
            'create_group.html'
        )
开发者ID:michaelfarrell76,项目名称:CS262_Messaging_App,代码行数:57,代码来源:application.py


示例14: _rasp_session

def _rasp_session(engine = None):
    
    if engine is None:
        return scoped_session(sessionmaker(autoflush=False, 
                                                autocommit=False))
    else:
        return scoped_session(sessionmaker(autoflush=False,
                                                autocommit=False,
                                                bind=engine))
开发者ID:rasp-home,项目名称:rasp-home-backend,代码行数:9,代码来源:database.py


示例15: __new__

    def __new__(cls, *args, **kwarg):
        if not cls._session:
            engine = create_engine(settings.DATABASE_ENGINE, pool_size=settings.DATABASE_POOL_SIZE, pool_recycle=300, proxy=TimerProxy())

            if hasattr(settings, 'CACHED_QUERY') and settings.CACHED_QUERY:
                from torneira.cache import CachedQuery
                cls._session = scoped_session(sessionmaker(autocommit=True, autoflush=False, expire_on_commit=False, query_cls=CachedQuery, bind=engine))
            else:
                cls._session = scoped_session(sessionmaker(autocommit=True, autoflush=False, expire_on_commit=False, bind=engine))

        return cls._session
开发者ID:Magic347,项目名称:torneira,代码行数:11,代码来源:meta.py


示例16: setUp

 def setUp(self):
     self.app = TestApp(build_app().wsgifunc())
     self.db = orm.scoped_session(
         orm.sessionmaker(bind=engine, query_cls=NoCacheQuery)
     )()
     self.db2 = orm.scoped_session(
         orm.sessionmaker(bind=engine, query_cls=NoCacheQuery)
     )()
     self.default_headers = {
         "Content-Type": "application/json"
     }
     flush()
开发者ID:Zipfer,项目名称:fuel-web,代码行数:12,代码来源:test_db_refresh.py


示例17: session

    def session(self):

        session = scoped_session(sessionmaker())
        session.configure(bind=self.engine)
        session = session()

        session.execute("SELECT 1").scalar()

        try:
            yield session
        finally:
            scoped_session(sessionmaker()).remove()
开发者ID:herqles-io,项目名称:hq-lib,代码行数:12,代码来源:__init__.py


示例18: init_from_config

def init_from_config(settings, prefix="sqlalchemy.", use_transaction=True):
    """ As above but use a settings dict, eg from a Pyramid config
    """
    global Session, engine, Base
    engine = sqlalchemy.engine_from_config(settings, prefix)
    if use_transaction:
        from zope.sqlalchemy import ZopeTransactionExtension

        Session = scoped_session(sessionmaker(bind=engine, extension=ZopeTransactionExtension()))
    else:
        Session = scoped_session(sessionmaker(bind=engine))
    Base.metadata.bind = engine
    init_modules()
开发者ID:pythonpro-dev,项目名称:pp-db,代码行数:13,代码来源:dbsetup.py


示例19: __init__

    def __init__(self, connection_string):

        self.log = logging.getLogger(__name__)

        self.sa_engine_online = None
        self.connection_string_online = connection_string.connectUrlonline
        self.session_online = scoped_session(sessionmaker(autoflush=True,
                                                   autocommit=False))

        self.sa_engine_offline = None
        self.connection_string_offline = connection_string.connectUrloffline
        self.session_offline = scoped_session(sessionmaker(autoflush=True,
                                                   autocommit=False))
开发者ID:cms-sw,项目名称:web-confdb,代码行数:13,代码来源:saengine.py


示例20: create_security_group_rule_bulk_native

    def create_security_group_rule_bulk_native(self, context, security_group_rules):
        rules = security_group_rules["security_group_rules"]
        scoped_session(context.session)
        security_group_id = self._validate_security_group_rules(context, security_group_rules)
        with context.session.begin(subtransactions=True):
            if not self.get_security_group(context, security_group_id):
                raise ext_sg.SecurityGroupNotFound(id=security_group_id)

            self._check_for_duplicate_rules(context, rules)
            ret = []
            for rule_dict in rules:
                res_rule_dict = self._create_security_group_rule(context, rule_dict, validate=False)
                ret.append(res_rule_dict)
            return ret
开发者ID:sebrandon1,项目名称:neutron,代码行数:14,代码来源:securitygroups_db.py



注:本文中的sqlalchemy.orm.scoped_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.sessionmaker函数代码示例发布时间:2022-05-27
下一篇:
Python orm.relationship函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap