• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python pool.manage函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.pool.manage函数的典型用法代码示例。如果您正苦于以下问题:Python manage函数的具体用法?Python manage怎么用?Python manage使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了manage函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testbadargs

    def testbadargs(self):
        manager = pool.manage(mock_dbapi)

        try:
            connection = manager.connect(None)
        except:
            pass
开发者ID:gajop,项目名称:springgrid,代码行数:7,代码来源:test_pool.py


示例2: __init__

    def __init__(self, host, database, user, password, debug=False):
        self.host = host
        self.database = database
        # 用于判断是否打印sql
        self.debug = debug

        args = {'database': database}
        if user is not None:
            args['user'] = user
        if password is not None:
            args['password'] = password

        pair = host.split(':')
        if len(pair) == 2:
            args['host'] = pair[0]
            args['port'] = int(pair[1])
        else:
            args['host'] = pair[0]
            args['port'] = 3388

        self.args = args
        self._db = None
        # 单位为秒
        self._db_operation_timeout = 300

        # 连接池
        self.conn_pool = pool.manage(
            MySQLdb, pool_size=20, max_overflow=40, timeout=self._db_operation_timeout, recycle=1800)

        self.reconnection()
开发者ID:wmroar,项目名称:datouyi,代码行数:30,代码来源:DB_Mysql.py


示例3: testnonthreadlocalmanager

    def testnonthreadlocalmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal=False)

        connection = manager.connect("foo.db")
        connection2 = manager.connect("foo.db")

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is not connection2)
开发者ID:chatch,项目名称:pinyin-toolkit,代码行数:8,代码来源:test_pool.py


示例4: test_non_thread_local_manager

    def test_non_thread_local_manager(self):
        manager = pool.manage(MockDBAPI(), use_threadlocal=False)

        connection = manager.connect('foo.db')
        connection2 = manager.connect('foo.db')

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is not connection2)
开发者ID:kihon10,项目名称:sqlalchemy,代码行数:8,代码来源:test_pool.py


示例5: patch_postgresql

def patch_postgresql():
    try:
        from django.db.backends.postgresql_psycopg2 import base as pgsql_base
    except (ImproperlyConfigured, ImportError) as e:
        return

    if not hasattr(pgsql_base, "_Database"):
        pgsql_base._Database = pgsql_base.Database
        pgsql_base.Database = manage(pgsql_base._Database, **POOL_SETTINGS)
开发者ID:gto-web,项目名称:djorm-ext-pool,代码行数:9,代码来源:__init__.py


示例6: patch_oracle

def patch_oracle():
    try:
        from django.db.backends.oracle import base as oracle_base
    except (ImproperlyConfigured, ImportError) as e:
        return

    if not hasattr(oracle_base, "_Database"):
        oracle_base._Database = oracle_base.Database
        oracle_base.Database = manage(oracle_base._Database, **POOL_SETTINGS)
开发者ID:gto-web,项目名称:djorm-ext-pool,代码行数:9,代码来源:__init__.py


示例7: patch_sqlite3

def patch_sqlite3():
    try:
        from django.db.backends.sqlite3 import base as sqlite3_base
    except (ImproperlyConfigured, ImportError) as e:
        return

    if not hasattr(sqlite3_base, "_Database"):
        sqlite3_base._Database = sqlite3_base.Database
        sqlite3_base.Database = manage(sqlite3_base._Database, **POOL_SETTINGS)
开发者ID:gto-web,项目名称:djorm-ext-pool,代码行数:9,代码来源:__init__.py


示例8: testmanager

    def testmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal=True)

        connection = manager.connect("foo.db")
        connection2 = manager.connect("foo.db")
        connection3 = manager.connect("bar.db")

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is connection2)
        self.assert_(connection2 is not connection3)
开发者ID:chatch,项目名称:pinyin-toolkit,代码行数:10,代码来源:test_pool.py


示例9: testnonthreadlocalmanager

    def testnonthreadlocalmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal = False)
        
        connection = manager.connect('foo.db')
        connection2 = manager.connect('foo.db')

        self.echo( "connection " + repr(connection))

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is not connection2)
开发者ID:nakedible,项目名称:vpnease-l2tp,代码行数:10,代码来源:pool.py


示例10: testmanager

    def testmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal=True)

        connection = manager.connect('foo.db')
        connection2 = manager.connect('foo.db')
        connection3 = manager.connect('bar.db')

        print "connection " + repr(connection)
        self.assert_(connection.cursor() is not None)
        self.assert_(connection is connection2)
        self.assert_(connection2 is not connection3)
开发者ID:gajop,项目名称:springgrid,代码行数:11,代码来源:test_pool.py


示例11: get_pool

def get_pool():
    "Creates one and only one pool using the configured settings."
    global MYSQLPOOL
    if MYSQLPOOL is None:
        backend = getattr(settings, 'MYSQLPOOL_BACKEND', MYSQLPOOL_BACKEND)
        backend = getattr(pool, backend)
        kwargs = getattr(settings, 'MYSQLPOOL_ARGUMENTS', {})
        kwargs.setdefault('poolclass', backend)
        # The user can override this, but set it by default for safety.
        kwargs.setdefault('recycle', MYSQLPOOL_TIMEOUT)
        MYSQLPOOL = pool.manage(OldDatabase, **kwargs)
    return MYSQLPOOL
开发者ID:namongk,项目名称:django-mysqlpool,代码行数:12,代码来源:base.py


示例12: _connect

 def _connect(self):
     if cfg['CFG_MISCUTIL_SQL_USE_SQLALCHEMY']:
         try:
             import sqlalchemy.pool as pool
             import MySQLdb as mysqldb
             mysqldb = pool.manage(mysqldb, use_threadlocal=True)
             connect = mysqldb.connect
         except ImportError:
             cfg['CFG_MISCUTIL_SQL_USE_SQLALCHEMY'] = False
             from MySQLdb import connect
     else:
         from MySQLdb import connect
     return connect
开发者ID:mhellmic,项目名称:b2share,代码行数:13,代码来源:dbquery.py


示例13: _get_pool

def _get_pool():
    """
    Private function to get the database pool instance, or create one if it's not available yet.

    :returns: The connection pool instance
    """
    global _pool

    if _pool is None:
        # FIXME: Use pool size and other parameters from config
        _pool = dbpool.manage(MySQLdb, pool_size=1, use_threadlocal=True)

    return _pool
开发者ID:alvabai,项目名称:trac-multiproject,代码行数:13,代码来源:db.py


示例14: test_manager_with_key

    def test_manager_with_key(self):
        class NoKws(object):
            def connect(self, arg):
                return MockConnection()

        manager = pool.manage(NoKws(), use_threadlocal=True)

        c1 = manager.connect('foo.db', sa_pool_key="a")
        c2 = manager.connect('foo.db', sa_pool_key="b")
        c3 = manager.connect('bar.db', sa_pool_key="a")

        assert c1.cursor() is not None
        assert c1 is not c2
        assert c1 is  c3
开发者ID:pshken,项目名称:sqlalchemy,代码行数:14,代码来源:test_pool.py


示例15: get_pool

def get_pool():
    """Create one and only one pool using the configured settings."""
    global MYSQLPOOL
    if MYSQLPOOL is None:
        backend_name = getattr(settings, "SHIELD_MYSQL_POOL_BACKEND", DEFAULT_BACKEND)
        backend = getattr(local_pool, backend_name)
        kwargs = getattr(settings, "SHIELD_MYSQL_POOL_ARGUMENTS", {})
        kwargs.setdefault("poolclass", backend)
        kwargs.setdefault("recycle", DEFAULT_POOL_TIMEOUT)
        MYSQLPOOL = sa_pool.manage(OldDatabase, **kwargs)
        setattr(MYSQLPOOL, "_pid", os.getpid())

    if getattr(MYSQLPOOL, "_pid", None) != os.getpid():
        sa_pool.clear_managers()
    return MYSQLPOOL
开发者ID:torpedoallen,项目名称:shield_mysql_pool,代码行数:15,代码来源:base.py


示例16: test_manager

    def test_manager(self):
        manager = pool.manage(MockDBAPI(), use_threadlocal=True)

        c1 = manager.connect('foo.db')
        c2 = manager.connect('foo.db')
        c3 = manager.connect('bar.db')
        c4 = manager.connect("foo.db", bar="bat")
        c5 = manager.connect("foo.db", bar="hoho")
        c6 = manager.connect("foo.db", bar="bat")

        assert c1.cursor() is not None
        assert c1 is c2
        assert c1 is not c3
        assert c4 is c6
        assert c4 is not c5
开发者ID:kihon10,项目名称:sqlalchemy,代码行数:15,代码来源:test_pool.py


示例17: get_pool

def get_pool():
    """Create one and only one pool using the configured settings."""
    global MYSQLPOOL
    if MYSQLPOOL is None:
        backend_name = getattr(settings, 'MYSQLPOOL_BACKEND', DEFAULT_BACKEND)
        backend = getattr(pool, backend_name)
        kwargs = getattr(settings, 'MYSQLPOOL_ARGUMENTS', {})
        kwargs.setdefault('poolclass', backend)
        kwargs.setdefault('recycle', DEFAULT_POOL_TIMEOUT)
        MYSQLPOOL = pool.manage(OldDatabase, **kwargs)
        setattr(MYSQLPOOL, '_pid', os.getpid())

    if getattr(MYSQLPOOL, '_pid', None) != os.getpid():
        pool.clear_managers()
    return MYSQLPOOL
开发者ID:samozihu,项目名称:django-mysqlpool,代码行数:15,代码来源:base.py


示例18: init_pool

def init_pool():
     if not globals().get('pool_initialized', False):
         global pool_initialized
         pool_initialized = True
         try:
             backendname = settings.DATABASES['default']['ENGINE']
             backend = load_backend(backendname)

             #replace the database object with a proxy.
             backend.Database = pool.manage(backend.Database)

             backend.DatabaseError = backend.Database.DatabaseError
             backend.IntegrityError = backend.Database.IntegrityError
             logging.info("Connection Pool initialized")
         except:
             logging.exception("Connection Pool initialization error")
开发者ID:zhwolf,项目名称:myproject,代码行数:16,代码来源:__init__.py


示例19: init_pool

def init_pool():
    """From http://blog.bootstraptoday.com/2012/07/11/django-connection-pooling-using-sqlalchemy-connection-pool/"""
    if not globals().get('pool_initialized', False):
        global pool_initialized
        pool_initialized = True
        try:
            backendname = settings.DATABASES['default']['ENGINE']
            backend = load_backend(backendname)
            
            #replace the database object with a proxy.
            backend.Database = pool.manage(backend.Database, pool_size=settings.DB_POOL_SIZE, max_overflow=-1)
            
            backend.DatabaseError = backend.Database.DatabaseError
            backend.IntegrityError = backend.Database.IntegrityError
            logging.info("Connection Pool initialized")
        except:
            logging.exception("Connection Pool initialization error")
开发者ID:di445,项目名称:ntucker,代码行数:17,代码来源:__init__.py


示例20: patch_mysql

def patch_mysql():
    class hashabledict(dict):
        def __hash__(self):
            return hash(tuple(sorted(self.items())))

    class hashablelist(list):
        def __hash__(self):
            return hash(tuple(sorted(self)))

    class ManagerProxy(object):
        def __init__(self, manager):
            self.manager = manager

        def __getattr__(self, key):
            return getattr(self.manager, key)

        def connect(self, *args, **kwargs):
            if 'conv' in kwargs:
                conv = kwargs['conv']
                if isinstance(conv, dict):
                    items = []
                    for k, v in conv.items():
                        if isinstance(v, list):
                            v = hashablelist(v)
                        items.append((k, v))
                    kwargs['conv'] = hashabledict(items)
            if 'ssl' in kwargs:
                ssl = kwargs['ssl']
                if isinstance(ssl, dict):
                    items = []
                    for k, v in ssl.items():
                        if isinstance(v, list):
                            v = hashablelist(v)
                        items.append((k, v))
                    kwargs['ssl'] = hashabledict(items)
            return self.manager.connect(*args, **kwargs)

    try:
        from django.db.backends.mysql import base as mysql_base
    except (ImproperlyConfigured, ImportError) as e:
        return

    if not hasattr(mysql_base, "_Database"):
        mysql_base._Database = mysql_base.Database
        mysql_base.Database = ManagerProxy(manage(mysql_base._Database, **POOL_SETTINGS))
开发者ID:gto-web,项目名称:djorm-ext-pool,代码行数:45,代码来源:__init__.py



注:本文中的sqlalchemy.pool.manage函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python processors.to_decimal_processor_factory函数代码示例发布时间:2022-05-27
下一篇:
Python pool.clear_managers函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap