• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python connection.get_connection函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mongoengine.connection.get_connection函数的典型用法代码示例。如果您正苦于以下问题:Python get_connection函数的具体用法?Python get_connection怎么用?Python get_connection使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_connection函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_connection_kwargs

    def test_connection_kwargs(self):
        """Ensure that connection kwargs get passed to pymongo."""
        connect('mongoenginetest', alias='t1', tz_aware=True)
        conn = get_connection('t1')

        self.assertTrue(get_tz_awareness(conn))

        connect('mongoenginetest2', alias='t2')
        conn = get_connection('t2')
        self.assertFalse(get_tz_awareness(conn))
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:10,代码来源:test_connection.py


示例2: test_sharing_connections

    def test_sharing_connections(self):
        """Ensure that connections are shared when the connection settings are exactly the same
        """
        connect('mongoenginetest', alias='testdb1')

        expected_connection = get_connection('testdb1')

        connect('mongoenginetest', alias='testdb2')
        actual_connection = get_connection('testdb2')
        self.assertEqual(expected_connection, actual_connection)
开发者ID:ThisGuyCodes,项目名称:mongoengine,代码行数:10,代码来源:test_connection.py


示例3: test_connection_kwargs

    def test_connection_kwargs(self):
        """Ensure that connection kwargs get passed to pymongo.
        """
        connect("mongoenginetest", alias="t1", tz_aware=True)
        conn = get_connection("t1")

        self.assertTrue(conn.tz_aware)

        connect("mongoenginetest2", alias="t2")
        conn = get_connection("t2")
        self.assertFalse(conn.tz_aware)
开发者ID:satyanani40,项目名称:mongoengine,代码行数:11,代码来源:test_connection.py


示例4: test_connect

    def test_connect(self):
        """Ensure that the connect() method works properly."""
        connect('mongoenginetest')

        conn = get_connection()
        self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)

        db = get_db()
        self.assertIsInstance(db, pymongo.database.Database)
        self.assertEqual(db.name, 'mongoenginetest')

        connect('mongoenginetest2', alias='testdb')
        conn = get_connection('testdb')
        self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:14,代码来源:test_connection.py


示例5: disconnect

def disconnect(alias=DEFAULT_CONNECTION_NAME):
    """ To disconnect pymongo connection.

    Copied from mongoengine/connection.py to fix a bug in mongoengine source code,
    ('disconnect' method is removed from pymongo MongoClient in latest version.)
    """
    global _connections
    global _dbs

    if alias in _connections:
        get_connection(alias=alias).close()
        del _connections[alias]
    if alias in _dbs:
        del _dbs[alias]
开发者ID:hspandher,项目名称:django-test-addons,代码行数:14,代码来源:utils.py


示例6: test_connect_in_mocking

    def test_connect_in_mocking(self):
        """Ensure that the connect() method works properly in mocking.
        """
        try:
            import mongomock
        except ImportError:
            raise SkipTest('you need mongomock installed to run this testcase')

        connect('mongoenginetest', host='mongomock://localhost')
        conn = get_connection()
        self.assertTrue(isinstance(conn, mongomock.MongoClient))

        connect('mongoenginetest2', host='mongomock://localhost', alias='testdb')
        conn = get_connection('testdb')
        self.assertTrue(isinstance(conn, mongomock.MongoClient))
开发者ID:DavidBord,项目名称:mongoengine,代码行数:15,代码来源:test_connection.py


示例7: test_connect

    def test_connect(self):
        """Ensure that the connect() method works properly.
        """
        connect('mongoenginetest')

        conn = get_connection()
        self.assertTrue(isinstance(conn, pymongo.connection.Connection))

        db = get_db()
        self.assertTrue(isinstance(db, pymongo.database.Database))
        self.assertEqual(db.name, 'mongoenginetest')

        connect('mongoenginetest2', alias='testdb')
        conn = get_connection('testdb')
        self.assertTrue(isinstance(conn, pymongo.connection.Connection))
开发者ID:deignacio,项目名称:mongoengine,代码行数:15,代码来源:connection.py


示例8: test_sharing_connections

    def test_sharing_connections(self):
        """Ensure that connections are shared when the connection settings are exactly the same
        """
        connect('mongoenginetests', alias='testdb1')
        expected_connection = get_connection('testdb1')

        connect('mongoenginetests', alias='testdb2')
        actual_connection = get_connection('testdb2')

        # Handle PyMongo 3+ Async Connection
        if IS_PYMONGO_3:
            # Ensure we are connected, throws ServerSelectionTimeoutError otherwise.
            # Purposely not catching exception to fail test if thrown.
            expected_connection.server_info()

        self.assertEqual(expected_connection, actual_connection)
开发者ID:OJFord,项目名称:mongoengine,代码行数:16,代码来源:test_connection.py


示例9: teardown_databases

 def teardown_databases(self, *args, **kwargs):
     for alias, params in self._iter_test_databases():
         connection = get_connection(alias)
         print("Dropping test database for alias '%s': %s" % (alias, params['name']))
         connection.drop_database(params['name'])
         disconnect(alias)
     return super(TestRunner, self).teardown_databases(*args, **kwargs)
开发者ID:Miaodeli,项目名称:drf-mongo-filters,代码行数:7,代码来源:mongoutils.py


示例10: test_register_connection_defaults

    def test_register_connection_defaults(self):
        """Ensure that defaults are used when the host and port are None.
        """
        register_connection('testdb', 'mongoenginetest', host=None, port=None)

        conn = get_connection('testdb')
        self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
开发者ID:ThisGuyCodes,项目名称:mongoengine,代码行数:7,代码来源:test_connection.py


示例11: teardown_databases

 def teardown_databases(self, old_config, **kwargs):
     from mongoengine.connection import get_connection, disconnect
     connection = get_connection()
     connection.drop_database(self.mongodb_name)
     print 'Dropping mongo test database: ' + self.mongodb_name
     disconnect()
     super(MongoTestRunner, self).teardown_databases(old_config, **kwargs)
开发者ID:mjhea0,项目名称:django-mongonaut,代码行数:7,代码来源:testrunner.py


示例12: setUp

 def setUp(self):
     # データベースに接続
     addr = '127.0.0.1'
     port = 27017
     connect('test', host=addr, port=port)
     self.conn = get_connection()
     self.db = get_db()
开发者ID:JFK,项目名称:python-tornado-site-template,代码行数:7,代码来源:test_user.py


示例13: test_connect_uri

    def test_connect_uri(self):
        """Ensure that the connect() method works properly with URIs."""
        c = connect(db='mongoenginetest', alias='admin')
        c.admin.system.users.remove({})
        c.mongoenginetest.system.users.remove({})

        c.admin.add_user("admin", "password")
        c.admin.authenticate("admin", "password")
        c.mongoenginetest.add_user("username", "password")

        if not IS_PYMONGO_3:
            self.assertRaises(
                MongoEngineConnectionError, connect, 'testdb_uri_bad',
                host='mongodb://test:[email protected]'
            )

        connect("testdb_uri", host='mongodb://username:[email protected]/mongoenginetest')

        conn = get_connection()
        self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))

        db = get_db()
        self.assertTrue(isinstance(db, pymongo.database.Database))
        self.assertEqual(db.name, 'mongoenginetest')

        c.admin.system.users.remove({})
        c.mongoenginetest.system.users.remove({})
开发者ID:mikeckennedy,项目名称:mongoengine,代码行数:27,代码来源:test_connection.py


示例14: get_mongodb_version

def get_mongodb_version():
    """Return the version of the connected mongoDB (first 2 digits)

    :return: tuple(int, int)
    """
    version_list = get_connection().server_info()['versionArray'][:2]     # e.g: (3, 2)
    return tuple(version_list)
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:7,代码来源:mongodb_support.py


示例15: test_connect_uri

    def test_connect_uri(self):
        """Ensure that the connect() method works properly with URIs."""
        c = connect(db='mongoenginetest', alias='admin')
        c.admin.system.users.delete_many({})
        c.mongoenginetest.system.users.delete_many({})

        c.admin.command("createUser", "admin", pwd="password", roles=["root"])
        c.admin.authenticate("admin", "password")
        c.admin.command("createUser", "username", pwd="password", roles=["dbOwner"])

        if not IS_PYMONGO_3:
            self.assertRaises(
                MongoEngineConnectionError, connect, 'testdb_uri_bad',
                host='mongodb://test:[email protected]'
            )

        connect("testdb_uri", host='mongodb://username:[email protected]/mongoenginetest')

        conn = get_connection()
        self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)

        db = get_db()
        self.assertIsInstance(db, pymongo.database.Database)
        self.assertEqual(db.name, 'mongoenginetest')

        c.admin.system.users.delete_many({})
        c.mongoenginetest.system.users.delete_many({})
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:27,代码来源:test_connection.py


示例16: test_connect_uri_without_db

    def test_connect_uri_without_db(self):
        """Ensure that the connect() method works properly with uri's
        without database_name
        """
        c = connect(db='mongoenginetest', alias='admin')
        c.admin.system.users.remove({})
        c.mongoenginetest.system.users.remove({})

        c.admin.add_user("admin", "password")
        c.admin.authenticate("admin", "password")
        c.mongoenginetest.add_user("username", "password")

        self.assertRaises(ConnectionError, connect, "testdb_uri_bad", host='mongodb://test:[email protected]')

        connect("mongoenginetest", host='mongodb://localhost/')

        conn = get_connection()
        self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))

        db = get_db()
        self.assertTrue(isinstance(db, pymongo.database.Database))
        self.assertEqual(db.name, 'mongoenginetest')

        c.admin.system.users.remove({})
        c.mongoenginetest.system.users.remove({})
开发者ID:ThisGuyCodes,项目名称:mongoengine,代码行数:25,代码来源:test_connection.py


示例17: test_connect_uri

    def test_connect_uri(self):
        """Ensure that the connect() method works properly with uri's
        """
        c = connect(alias='admin')
        register_db('mongoenginetest', 'admin', 'admin')
        c.admin.system.users.remove({})
        c.mongoenginetest.system.users.remove({})

        c.admin.add_user("admin", "password")
        c.admin.authenticate("admin", "password")
        c.mongoenginetest.add_user("username", "password")

        self.assertRaises(
            ConnectionError, connect, "testdb_uri_bad",
            host='mongodb://test:[email protected]')

        # Whilst database names can be specified in the URI, they are ignored
        # in mongoengine since the DB/connection split
        connect(host='mongodb://username:[email protected]/mongoenginetest')
        register_db('testdb_uri')

        conn = get_connection()
        self.assertTrue(isinstance(conn, pymongo.connection.Connection))

        db = get_db()
        self.assertTrue(isinstance(db, pymongo.database.Database))
        self.assertEqual(db.name, 'testdb_uri')
开发者ID:aszwemin,项目名称:mongoengine,代码行数:27,代码来源:connection.py


示例18: test_ttl_indexes

    def test_ttl_indexes(self):

        class Log(Document):
            created = DateTimeField(default=datetime.now)
            meta = {
                'indexes': [
                    {'fields': ['created'], 'expireAfterSeconds': 3600}
                ]
            }

        Log.drop_collection()

        if pymongo.version_tuple[0] < 2 and pymongo.version_tuple[1] < 3:
            raise SkipTest('pymongo needs to be 2.3 or higher for this test')

        connection = get_connection()
        version_array = connection.server_info()['versionArray']
        if version_array[0] < 2 and version_array[1] < 2:
            raise SkipTest('MongoDB needs to be 2.2 or higher for this test')

        # Indexes are lazy so use list() to perform query
        list(Log.objects)
        info = Log.objects._collection.index_information()
        self.assertEqual(3600,
                         info['created_1']['expireAfterSeconds'])
开发者ID:korvyashka,项目名称:mongoengine,代码行数:25,代码来源:indexes.py


示例19: test_connect_with_db_name_external

    def test_connect_with_db_name_external(self):
        """Ensure that connect() works if db name is $external
        """
        """Ensure that the connect() method works properly."""
        connect('$external')

        conn = get_connection()
        self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)

        db = get_db()
        self.assertIsInstance(db, pymongo.database.Database)
        self.assertEqual(db.name, '$external')

        connect('$external', alias='testdb')
        conn = get_connection('testdb')
        self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:16,代码来源:test_connection.py


示例20: _post_teardown

 def _post_teardown(self):
     from mongoengine.connection import get_connection, disconnect
     for db_name, db_alias in settings.MONGO_DATABASES.items():
         connection = get_connection(db_alias)
         connection.drop_database(db_name)
         disconnect(db_alias)
     super(MongoTestCase, self)._post_teardown()
开发者ID:snormore,项目名称:django-mongotesting,代码行数:7,代码来源:testcases.py



注:本文中的mongoengine.connection.get_connection函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python connection.get_db函数代码示例发布时间:2022-05-27
下一篇:
Python connection.disconnect函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap