• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python session.sessionmaker函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.session.sessionmaker函数的典型用法代码示例。如果您正苦于以下问题:Python sessionmaker函数的具体用法?Python sessionmaker怎么用?Python sessionmaker使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sessionmaker函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: create_session

 def create_session(self, engine=None, shard=False):
     class_ = QKShardSession if shard else QKSession
     factory = sessionmaker(
         db=self, bind=engine, class_=class_,
         autocommit=False, autoflush=False
     )
     return _scoped_session(factory, scopefunc=self.scopefunc)
开发者ID:qianka,项目名称:qianka-sqlalchemy,代码行数:7,代码来源:sqlalchemy.py


示例2: persist_bundle_sensor

    def persist_bundle_sensor(self):
        from madmex.persistence.driver import persist_bundle
        folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot'
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.mapper.bundle.spot5 import Bundle
        #from madmex.configuration import SETTINGS

        dummy = Bundle(folder)
        #dummy.target = '/LUSTRE/MADMEX/staging/'
        target_url = getattr(SETTINGS, 'TEST_FOLDER')
        print target_url
        #TODO please fix me, horrible hack
        dummy.target = target_url
        persist_bundle(dummy)
        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id

        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(target_url, os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                os.remove(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()
开发者ID:CONABIO,项目名称:madmex-antares,代码行数:35,代码来源:test_database.py


示例3: __init__

	def __init__(self, database, config):
		self.name = database
		self._config = config
		url = Database.build_url(database, config)
		self._engine = sa.create_engine(url)
		self._metadata = sa.MetaData(bind=self._engine, naming_convention=kNamingConvention)
		self._sessionmaker = sessionmaker(bind=self._engine)
开发者ID:blindsightcorp,项目名称:rigor,代码行数:7,代码来源:database.py


示例4: db_searchreplace

def db_searchreplace(db_name, db_user, db_password, db_host, search, replace ):
    engine = create_engine("mysql://%s:%[email protected]%s/%s" % (db_user, db_password, db_host, db_name ))
    #inspector = reflection.Inspector.from_engine(engine)
    #print inspector.get_table_names()
    meta = MetaData()
    meta.bind = engine
    meta.reflect()

    Session = sessionmaker(engine)


    Base = declarative_base(metadata=meta)
    session = Session()

    tableClassDict = {}
    for table_name, table_obj in dict.iteritems(Base.metadata.tables):
        try:
            tableClassDict[table_name] = type(str(table_name), (Base,), {'__tablename__': table_name, '__table_args__':{'autoload' : True, 'extend_existing': True} })
    #        class tempClass(Base):
    #            __tablename__ = table_name
    #            __table_args__ = {'autoload' : True, 'extend_existing': True}
    #            foo_id = Column(Integer, primary_key='temp')
            for row in session.query(tableClassDict[table_name]).all():
                for column in table_obj._columns.keys():
                    data_to_fix = getattr(row, column)
                    fixed_data = recursive_unserialize_replace( search, replace, data_to_fix, False)

                    setattr(row, column, fixed_data)
                    #print fixed_data
        except Exception, e:
            print e
开发者ID:johnraz,项目名称:sword,代码行数:31,代码来源:searchreplacedb.py


示例5: __init__

 def __init__(self, engine):
     '''
     Constructor
     '''
     self.engine = engine
     klass = sessionmaker(bind=self.engine)
     self.session = klass()
开发者ID:amaurs,项目名称:armed-conflict,代码行数:7,代码来源:query_database.py


示例6: session_cm

def session_cm():
    session = sessionmaker(bind=db_engine)()
    try:
        yield session
    except Exception, e:
        # logger.warn(traceback.format_exc())
        raise
开发者ID:depsi,项目名称:timefly,代码行数:7,代码来源:database_session.py


示例7: _run_indexer

def _run_indexer(options):
    logging.info("Starting indexer %s:%s ..." % (options.host, options.port))
    # initialize crawler
    service = WaveformIndexer((options.host, options.port), MyHandler)
    service.log = logging
    try:
        # prepare paths
        if ',' in options.data:
            paths = options.data.split(',')
        else:
            paths = [options.data]
        paths = service._prepare_paths(paths)
        if not paths:
            return
        # prepare map file
        if options.mapping_file:
            with open(options.mapping_file, 'r') as f:
                data = f.readlines()
            mappings = parse_mapping_data(data)
            logging.info("Parsed %d lines from mapping file %s" %
                         (len(data), options.mapping_file))
        else:
            mappings = {}
        # create file queue and worker processes
        manager = multiprocessing.Manager()
        in_queue = manager.dict()
        work_queue = manager.list()
        out_queue = manager.list()
        log_queue = manager.list()
        # spawn processes
        for i in range(options.number_of_cpus):
            args = (i, in_queue, work_queue, out_queue, log_queue, mappings)
            p = multiprocessing.Process(target=worker, args=args)
            p.daemon = True
            p.start()
        # connect to database
        engine = create_engine(options.db_uri, encoding=native_str('utf-8'),
                               convert_unicode=True)
        metadata = Base.metadata
        # recreate database
        if options.drop_database:
            metadata.drop_all(engine, checkfirst=True)
        metadata.create_all(engine, checkfirst=True)
        # initialize database + options
        _session = sessionmaker(bind=engine)
        service.session = _session
        service.options = options
        service.mappings = mappings
        # set queues
        service.input_queue = in_queue
        service.work_queue = work_queue
        service.output_queue = out_queue
        service.log_queue = log_queue
        service.paths = paths
        service._reset_walker()
        service._step_walker()
        service.serve_forever(options.poll_interval)
    except KeyboardInterrupt:
        quit()
    logging.info("Indexer stopped.")
开发者ID:QuLogic,项目名称:obspy,代码行数:60,代码来源:indexer.py


示例8: setUp

 def setUp(self):
     print "setup.."
     #engine = create_engine('sqlite:///data/test.db', echo=True)
     engine = create_engine('sqlite://', echo=True)
     Session = sessionmaker(bind=engine)
     self.session = Session()
     ImageModel.metadata.create_all(engine)
开发者ID:conchis,项目名称:image-tools,代码行数:7,代码来源:image_model_test.py


示例9: __init__

 def __init__(self):
     '''
     Constructor
     '''
     self.base = declarative_base()
     self.SessionFactory = sessionmaker(bind=self.engine)
     self.session = self.SessionFactory()
开发者ID:elioth010,项目名称:lugama,代码行数:7,代码来源:Model.py


示例10: setUp

    def setUp(self):

        engine = create_engine("sqlite:///:memory:", echo=True)
        Session = sessionmaker(bind=engine)
        self.session = Session()

        Base = declarative_base()

        class Post(Base):
            __tablename__ = "objectA"
            __searchable__ = ["title", "body"]

            id = Column(Integer, primary_key=True)
            title = Column(Text)
            body = Column(UnicodeText)
            created = Column(DateTime, default=datetime.datetime.utcnow())

            def __repr__(self):
                return "{0}(title={1})".format(self.__class__.__name__, self.title)

        self.Post = Post
        Base.metadata.create_all(engine)

        self.index_manager = whooshalchemy.IndexService(session=self.session)
        self.index_manager.register_class(Post)
开发者ID:sfermigier,项目名称:WhooshAlchemy,代码行数:25,代码来源:tests.py


示例11: _create_session

 def _create_session(self, engine):
     Base.metadata.create_all(engine)
     # Distinguish sessions by thread.
     session = scoped_session(sessionmaker(
         extension=ZopeTransactionExtension()))
     session.configure(bind=engine)
     return session
开发者ID:Pylons,项目名称:repozitory,代码行数:7,代码来源:archive.py


示例12: __init__

 def __init__(self, name, url, scope_func):
     self._name = name
     self._url, engine_params = self._parse_url(url)
     self._engine = sqlalchemy.create_engine(self._url, **engine_params)
     self._session_maker = sessionmaker(self.engine, class_=Session, expire_on_commit=False)
     self._scoped_session_maker = scoped_session(self._session_maker, scopefunc=scope_func)
     self.Model = declarative_base()
开发者ID:viniciuschiele,项目名称:sqlalchemy-multidb,代码行数:7,代码来源:databases.py


示例13: main

def main(keep_saved, config_location):
    if not config_location:
        config_location = os.path.dirname(os.path.abspath(__file__)) + "\\config.ini"
    try:
        open(config_location)
    except IOError:
        print "Problem opening config.ini at " + config_location
    engine = create_engine(make_connection(config_location), echo=False)
    Session = sessionmaker(bind=engine)
    session = Session()

    Base = declarative_base()

    class FeverItems(Base):
        __tablename__ = 'fever_items'

        id = Column(INTEGER, primary_key=True)
        description = Column(LONGTEXT)
        is_saved = Column(TINYINT)

    records = session.query(FeverItems.description).filter_by(is_saved=1)
    contents = []
    for record in records:
        contents.append(record)
    if not keep_saved:
    session.query(FeverItems).filter_by(is_saved=1).update({"is_saved": 0})
    session.commit()
    return contents


if __name__ == "__main__":
    main(bool(sys.argv[1]), str(sys.argv[2]))
开发者ID:fuzzmz,项目名称:fever_saved_download,代码行数:32,代码来源:sql_connect.py


示例14: Session_Adapter_Add

def Session_Adapter_Add(data_information):
    Session = sessionmaker()
    Session.configure(bind=engine)
    session = Session()
    session.add(data_information)
    session.flush()
    session.commit()
开发者ID:zhoulunhao,项目名称:spider,代码行数:7,代码来源:data_option.py


示例15: _create_engine

    def _create_engine(self, url):
        echo = self.configuration.get('echo')
        if echo:
            echo = 'debug'

        engine = self.dialect.create_engine(url, self.schema, echo=echo)
        return engine, sessionmaker(bind=engine)
开发者ID:omersaeed,项目名称:spire,代码行数:7,代码来源:schema.py


示例16: __init__

    def __init__(self, db_url, echo=False, drop_all=False):
        '''
        Constructor
        '''
        self._clients = []
        self._pending = []

        logging.info("connecting to %s",db_url)
        params = dict(echo=echo)
        if 'mysql' in db_url:
            params['encoding']='utf-8'
            params['pool_recycle']=3600
            params['isolation_level']='READ COMMITTED'
        self._engine = create_engine(db_url, **params)
        self._Session = sessionmaker(bind=self._engine,
                                      extension=self._SESSION_EXTENSIONS_,
                                      **self._SESSION_KWARGS_)
        if drop_all is True:
            with self.session as session:
                self._drop_all_(session)
        self._create_all_()
        if drop_all is True:
            with self.session as session:
                session.add(model.Issue(id=1, name="repeat idea", type = model.Issue.TYPE[0]))
                session.add(model.Issue(id=2, name="speed", type = model.Issue.TYPE[1]))
                session.commit()
开发者ID:OskarBun,项目名称:lecturefb,代码行数:26,代码来源:control.py


示例17: tearDown

    def tearDown(self):
        self.db_session.close()

        database.db_session = scoped_session(
            sessionmaker())  # restore original db_session for following test cases

        Base.metadata.drop_all(bind=self.db_engine)
开发者ID:toroettg,项目名称:SeriesMarker,代码行数:7,代码来源:memory_db_test_case.py


示例18: add_period

 def add_period(self, period):
     if isinstance(period, Period):
         DBSession = sessionmaker(bind=self.engine)
         session = DBSession()
         
         session.add(period)
         session.commit()
开发者ID:LouisChen1905,项目名称:OneAnalyser,代码行数:7,代码来源:sqlite.py


示例19: get_session

 def get_session(self):
     if not self.sessionmaker:
         engine = self.get_db_engine()
         self.sessionmaker = scoped_session(sessionmaker(bind=engine))
     session = self.sessionmaker()
     session.rollback()
     return session
开发者ID:7scientists,项目名称:rouster,代码行数:7,代码来源:environment.py


示例20: init_database

 def init_database(self):
     '''
     Database 와 여기에 접속할 수 있는 Session 을 만든다.
     '''
     
     Base.metadata.create_all(self.get_engine())
     self.session = sessionmaker(bind=self.get_engine(), autoflush=True, autocommit=False)()
开发者ID:sunghwanJo,项目名称:ASAP,代码行数:7,代码来源:db.py



注:本文中的sqlalchemy.orm.session.sessionmaker函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python session.Session类代码示例发布时间:2022-05-27
下一篇:
Python session.object_session函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap