• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_client.get_client函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.test_client.get_client函数的典型用法代码示例。如果您正苦于以下问题:Python get_client函数的具体用法?Python get_client怎么用?Python get_client使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_client函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_auth_network_error

    def test_auth_network_error(self):
        # Make sure there's no semaphore leak if we get a network error
        # when authenticating a new socket with cached credentials.
        auth_client = get_client()

        auth_context.client.admin.add_user('admin', 'password')
        auth_client.admin.authenticate('admin', 'password')
        try:
            # Get a client with one socket so we detect if it's leaked.
            c = get_client(max_pool_size=1, waitQueueTimeoutMS=1)

            # Simulate an authenticate() call on a different socket.
            credentials = auth._build_credentials_tuple(
                'DEFAULT', 'admin',
                unicode('admin'), unicode('password'),
                {})

            c._cache_credentials('test', credentials, connect=False)

            # Cause a network error on the actual socket.
            pool = get_pool(c)
            socket_info = one(pool.sockets)
            socket_info.sock.close()

            # In __check_auth, the client authenticates its socket with the
            # new credential, but gets a socket.error. Should be reraised as
            # AutoReconnect.
            self.assertRaises(AutoReconnect, c.test.collection.find_one)

            # No semaphore leak, the pool is allowed to make a new socket.
            c.test.collection.find_one()
        finally:
            auth_client.admin.remove_user('admin')
开发者ID:arunbaabte,项目名称:DjangoAppService,代码行数:33,代码来源:test_auth.py


示例2: setUp

    def setUp(self):
        super(TestBulkWriteConcern, self).setUp()
        client = get_client()
        ismaster = client.test.command('ismaster')
        self.is_repl = bool(ismaster.get('setName'))
        self.w = len(ismaster.get("hosts", []))

        self.secondary = None
        if self.w > 1:
            for member in ismaster['hosts']:
                if member != ismaster['primary']:
                    host, port = _partition_node(member)
                    self.secondary = MongoClient(host, port)
                    break

        self.client = client
        self.coll = client.pymongo_test.test
        self.coll.remove()

        # We tested wtimeout errors by specifying a write concern greater than
        # the number of members, but in MongoDB 2.7.8+ this causes a different
        # sort of error, "Not enough data-bearing nodes". In recent servers we
        # use a failpoint to pause replication on a secondary.
        self.need_replication_stopped = version.at_least(self.client,
                                                         (2, 7, 8))

        self.test_commands_enabled = ("enableTestCommands=1"
                                      in get_command_line(self.client)["argv"])
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:28,代码来源:test_bulk.py


示例3: test_authenticate_and_request

    def test_authenticate_and_request(self):
        if (is_mongos(self.client) and not
            version.at_least(self.client, (2, 0, 0))):
            raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")

        # Database.authenticate() needs to be in a request - check that it
        # always runs in a request, and that it restores the request state
        # (in or not in a request) properly when it's finished.
        self.assertFalse(self.client.auto_start_request)
        db = self.client.pymongo_test
        db.system.users.remove({})
        db.remove_user("mike")
        db.add_user("mike", "password")
        self.assertFalse(self.client.in_request())
        self.assertTrue(db.authenticate("mike", "password"))
        self.assertFalse(self.client.in_request())

        request_cx = get_client(auto_start_request=True)
        request_db = request_cx.pymongo_test
        self.assertTrue(request_cx.in_request())
        self.assertTrue(request_db.authenticate("mike", "password"))
        self.assertTrue(request_cx.in_request())

        # just make sure there are no exceptions here
        db.logout()
        db.collection.find_one()
        request_db.logout()
        request_db.collection.find_one()
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:28,代码来源:test_database.py


示例4: test_authenticate_and_request

    def test_authenticate_and_request(self):
        if (is_mongos(self.client) and not
            version.at_least(self.client, (2, 0, 0))):
            raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")
        if not server_started_with_auth(self.client):
            raise SkipTest('Authentication is not enabled on server')

        # Database.authenticate() needs to be in a request - check that it
        # always runs in a request, and that it restores the request state
        # (in or not in a request) properly when it's finished.
        self.assertFalse(self.client.auto_start_request)
        db = self.client.pymongo_test
        db.add_user("mike", "password",
                    roles=["userAdmin", "dbAdmin", "readWrite"])
        try:
            self.assertFalse(self.client.in_request())
            self.assertTrue(db.authenticate("mike", "password"))
            self.assertFalse(self.client.in_request())

            request_cx = get_client(auto_start_request=True)
            request_db = request_cx.pymongo_test
            self.assertTrue(request_cx.in_request())
            self.assertTrue(request_db.authenticate("mike", "password"))
            self.assertTrue(request_cx.in_request())
        finally:
            db.authenticate("mike", "password")
            db.remove_user("mike")
            db.logout()
            request_db.logout()
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:29,代码来源:test_database.py


示例5: setUp

 def setUp(self):
     client = get_client()
     ismaster = client.test.command('ismaster')
     self.is_repl = bool(ismaster.get('setName'))
     self.w = len(ismaster.get("hosts", []))
     self.coll = client.pymongo_test.test
     self.coll.remove()
开发者ID:yangcol,项目名称:mongo-python-driver,代码行数:7,代码来源:test_bulk.py


示例6: test_only_secondary_ok_commands_have_read_prefs

    def test_only_secondary_ok_commands_have_read_prefs(self):
        c = get_client(read_preference=ReadPreference.SECONDARY)
        is_mongos = utils.is_mongos(c)
        if not is_mongos:
            raise SkipTest("Only mongos have read_prefs added to the spec")

        # Ensure secondary_ok_commands have readPreference
        for cmd in secondary_ok_commands:
            if cmd == 'mapreduce':  # map reduce is a special case
                continue
            command = SON([(cmd, 1)])
            cursor = c.pymongo_test["$cmd"].find(command.copy())
            # White-listed commands also have to be wrapped in $query
            command = SON([('$query', command)])
            command['$readPreference'] = {'mode': 'secondary'}
            self.assertEqual(command, cursor._Cursor__query_spec())

        # map_reduce inline should have read prefs
        command = SON([('mapreduce', 'test'), ('out', {'inline': 1})])
        cursor = c.pymongo_test["$cmd"].find(command.copy())
        # White-listed commands also have to be wrapped in $query
        command = SON([('$query', command)])
        command['$readPreference'] = {'mode': 'secondary'}
        self.assertEqual(command, cursor._Cursor__query_spec())

        # map_reduce that outputs to a collection shouldn't have read prefs
        command = SON([('mapreduce', 'test'), ('out', {'mrtest': 1})])
        cursor = c.pymongo_test["$cmd"].find(command.copy())
        self.assertEqual(command, cursor._Cursor__query_spec())

        # Other commands shouldn't be changed
        for cmd in ('drop', 'create', 'any-future-cmd'):
            command = SON([(cmd, 1)])
            cursor = c.pymongo_test["$cmd"].find(command.copy())
            self.assertEqual(command, cursor._Cursor__query_spec())
开发者ID:easonlin,项目名称:mongo-python-driver,代码行数:35,代码来源:test_read_preferences.py


示例7: test_uuid_queries

    def test_uuid_queries(self):
        if not should_test_uuid:
            raise SkipTest("No uuid module")

        c = get_client()
        coll = c.pymongo_test.test
        coll.drop()

        uu = uuid.uuid4()
        # Wrap uu.bytes in binary_type to work
        # around http://bugs.python.org/issue7380.
        coll.insert({'uuid': Binary(binary_type(uu.bytes), 3)})
        self.assertEqual(1, coll.count())

        # Test UUIDLegacy queries.
        coll.uuid_subtype = 4
        self.assertEqual(0, coll.find({'uuid': uu}).count())
        cur = coll.find({'uuid': UUIDLegacy(uu)})
        self.assertEqual(1, cur.count())
        retrieved = cur.next()
        self.assertEqual(uu, retrieved['uuid'])

        # Test regular UUID queries (using subtype 4).
        coll.insert({'uuid': uu})
        self.assertEqual(2, coll.count())
        cur = coll.find({'uuid': uu})
        self.assertEqual(1, cur.count())
        retrieved = cur.next()
        self.assertEqual(uu, retrieved['uuid'])

        # Test both.
        cur = coll.find({'uuid': {'$in': [uu, UUIDLegacy(uu)]}})
        self.assertEqual(2, cur.count())
        coll.drop()
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:34,代码来源:test_binary.py


示例8: test_server_disconnect

    def test_server_disconnect(self):
        # PYTHON-345, we need to make sure that threads' request sockets are
        # closed by disconnect().
        #
        # 1. Create a client with auto_start_request=True
        # 2. Start N threads and do a find() in each to get a request socket
        # 3. Pause all threads
        # 4. In the main thread close all sockets, including threads' request
        #       sockets
        # 5. In main thread, do a find(), which raises AutoReconnect and resets
        #       pool
        # 6. Resume all threads, do a find() in them
        #
        # If we've fixed PYTHON-345, then only one AutoReconnect is raised,
        # and all the threads get new request sockets.
        cx = get_client(auto_start_request=True)
        collection = cx.db.pymongo_test

        # acquire a request socket for the main thread
        collection.find_one()
        pool = get_pool(collection.database.connection)
        socket_info = pool._get_request_state()
        assert isinstance(socket_info, SocketInfo)
        request_sock = socket_info.sock

        state = FindPauseFind.create_shared_state(nthreads=40)

        threads = [FindPauseFind(collection, state) for _ in range(state.nthreads)]

        # Each thread does a find(), thus acquiring a request socket
        for t in threads:
            t.start()

        # Wait for the threads to reach the rendezvous
        FindPauseFind.wait_for_rendezvous(state)

        try:
            # Simulate an event that closes all sockets, e.g. primary stepdown
            for t in threads:
                t.request_sock.close()

            # Finally, ensure the main thread's socket's last_checkout is
            # updated:
            collection.find_one()

            # ... and close it:
            request_sock.close()

            # Doing an operation on the client raises an AutoReconnect and
            # resets the pool behind the scenes
            self.assertRaises(AutoReconnect, collection.find_one)

        finally:
            # Let threads do a second find()
            FindPauseFind.resume_after_rendezvous(state)

        joinall(threads)

        for t in threads:
            self.assertTrue(t.passed, "%s threw exception" % t)
开发者ID:helenseo,项目名称:mongo-python-driver,代码行数:60,代码来源:test_threads.py


示例9: _get_client

 def _get_client(self):
     """
     Intended for overriding in TestThreadsAuthReplicaSet. This method
     returns a MongoClient here, and a MongoReplicaSetClient in
     test_threads_replica_set_connection.py.
     """
     # Regular test client
     return get_client()
开发者ID:Parastou,项目名称:mongo-python-driver,代码行数:8,代码来源:test_threads.py


示例10: setUp

 def setUp(self):
     self.db = get_client().pymongo_test
     self.db.drop_collection("fs.files")
     self.db.drop_collection("fs.chunks")
     self.db.drop_collection("alt.files")
     self.db.drop_collection("alt.chunks")
     self.fs = gridfs.GridFS(self.db)
     self.alt = gridfs.GridFS(self.db, "alt")
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:8,代码来源:test_gridfs.py


示例11: setUp

 def setUp(self):
     super(TestBulkWriteConcern, self).setUp()
     client = get_client()
     ismaster = client.test.command('ismaster')
     self.is_repl = bool(ismaster.get('setName'))
     self.w = len(ismaster.get("hosts", []))
     self.coll = client.pymongo_test.test
     self.coll.remove()
开发者ID:1060460048,项目名称:mongo-python-driver,代码行数:8,代码来源:test_bulk.py


示例12: test_authenticate_multiple

    def test_authenticate_multiple(self):
        client = get_client()
        if is_mongos(client) and not version.at_least(self.client, (2, 2, 0)):
            raise SkipTest("Need mongos >= 2.2.0")
        if not server_started_with_auth(client):
            raise SkipTest("Authentication is not enabled on server")

        # Setup
        users_db = client.pymongo_test
        admin_db = client.admin
        other_db = client.pymongo_test1
        users_db.test.remove()
        other_db.test.remove()

        admin_db.add_user("admin", "pass", roles=["userAdminAnyDatabase", "dbAdmin", "clusterAdmin", "readWrite"])
        try:
            self.assertTrue(admin_db.authenticate("admin", "pass"))

            if version.at_least(self.client, (2, 5, 3, -1)):
                admin_db.add_user("ro-admin", "pass", roles=["userAdmin", "readAnyDatabase"])
            else:
                admin_db.add_user("ro-admin", "pass", read_only=True)

            users_db.add_user("user", "pass", roles=["userAdmin", "readWrite"])

            admin_db.logout()
            self.assertRaises(OperationFailure, users_db.test.find_one)

            # Regular user should be able to query its own db, but
            # no other.
            users_db.authenticate("user", "pass")
            self.assertEqual(0, users_db.test.count())
            self.assertRaises(OperationFailure, other_db.test.find_one)

            # Admin read-only user should be able to query any db,
            # but not write.
            admin_db.authenticate("ro-admin", "pass")
            self.assertEqual(0, other_db.test.count())
            self.assertRaises(OperationFailure, other_db.test.insert, {})

            # Force close all sockets
            client.disconnect()

            # We should still be able to write to the regular user's db
            self.assertTrue(users_db.test.remove())
            # And read from other dbs...
            self.assertEqual(0, other_db.test.count())
            # But still not write to other dbs...
            self.assertRaises(OperationFailure, other_db.test.insert, {})

        # Cleanup
        finally:
            admin_db.logout()
            users_db.logout()
            admin_db.authenticate("admin", "pass")
            remove_all_users(users_db)
            remove_all_users(admin_db)
开发者ID:Tokutek,项目名称:mongo-python-driver,代码行数:57,代码来源:test_database.py


示例13: test_authenticate_multiple

    def test_authenticate_multiple(self):
        client = get_client()
        if (is_mongos(client) and not
            version.at_least(self.client, (2, 0, 0))):
            raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")
        if not server_started_with_auth(client):
            raise SkipTest("Authentication is not enabled on server")

        # Setup
        users_db = client.pymongo_test
        admin_db = client.admin
        other_db = client.pymongo_test1
        users_db.system.users.remove()
        admin_db.system.users.remove()
        users_db.test.remove()
        other_db.test.remove()

        admin_db.add_user('admin', 'pass')
        self.assertTrue(admin_db.authenticate('admin', 'pass'))

        admin_db.add_user('ro-admin', 'pass', read_only=True)
        users_db.add_user('user', 'pass')

        admin_db.logout()
        self.assertRaises(OperationFailure, users_db.test.find_one)

        # Regular user should be able to query its own db, but
        # no other.
        users_db.authenticate('user', 'pass')
        self.assertEqual(0, users_db.test.count())
        self.assertRaises(OperationFailure, other_db.test.find_one)

        # Admin read-only user should be able to query any db,
        # but not write.
        admin_db.authenticate('ro-admin', 'pass')
        self.assertEqual(0, other_db.test.count())
        self.assertRaises(OperationFailure,
                          other_db.test.insert, {})

        # Force close all sockets
        client.disconnect()

        # We should still be able to write to the regular user's db
        self.assertTrue(users_db.test.remove())
        # And read from other dbs...
        self.assertEqual(0, other_db.test.count())
        # But still not write to other dbs...
        self.assertRaises(OperationFailure,
                          other_db.test.insert, {})

        # Cleanup
        admin_db.logout()
        users_db.logout()
        self.assertTrue(admin_db.authenticate('admin', 'pass'))
        self.assertTrue(admin_db.system.users.remove())
        self.assertEqual(0, admin_db.system.users.count())
        self.assertTrue(users_db.system.users.remove())
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:57,代码来源:test_database.py


示例14: teardown

def teardown():
    c = get_client()

    c.drop_database("pymongo-pooling-tests")
    c.drop_database("pymongo_test")
    c.drop_database("pymongo_test1")
    c.drop_database("pymongo_test2")
    c.drop_database("pymongo_test_mike")
    c.drop_database("pymongo_test_bernie")
开发者ID:DaBbleR23,项目名称:mongo-python-driver,代码行数:9,代码来源:__init__.py


示例15: test_file_exists

    def test_file_exists(self):
        db = get_client(w=1).pymongo_test
        fs = gridfs.GridFS(db)

        oid = fs.put(b("hello"))
        self.assertRaises(FileExists, fs.put, b("world"), _id=oid)

        one = fs.new_file(_id=123)
        one.write(b("some content"))
        one.close()

        two = fs.new_file(_id=123)
        self.assertRaises(FileExists, two.write, b('x' * 262146))
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:13,代码来源:test_gridfs.py


示例16: test_only_secondary_ok_commands_have_read_prefs

    def test_only_secondary_ok_commands_have_read_prefs(self):
        c = get_client(read_preference=ReadPreference.SECONDARY)
        ctx = catch_warnings()
        try:
            warnings.simplefilter("ignore", UserWarning)
            is_mongos = utils.is_mongos(c)
        finally:
            ctx.exit()
        if not is_mongos:
            raise SkipTest("Only mongos have read_prefs added to the spec")

        # Ensure secondary_ok_commands have readPreference
        for cmd in secondary_ok_commands:
            if cmd == "mapreduce":  # map reduce is a special case
                continue
            command = SON([(cmd, 1)])
            cursor = c.pymongo_test["$cmd"].find(command.copy())
            # White-listed commands also have to be wrapped in $query
            command = SON([("$query", command)])
            command["$readPreference"] = {"mode": "secondary"}
            self.assertEqual(command, cursor._Cursor__query_spec())

        # map_reduce inline should have read prefs
        command = SON([("mapreduce", "test"), ("out", {"inline": 1})])
        cursor = c.pymongo_test["$cmd"].find(command.copy())
        # White-listed commands also have to be wrapped in $query
        command = SON([("$query", command)])
        command["$readPreference"] = {"mode": "secondary"}
        self.assertEqual(command, cursor._Cursor__query_spec())

        # map_reduce that outputs to a collection shouldn't have read prefs
        command = SON([("mapreduce", "test"), ("out", {"mrtest": 1})])
        cursor = c.pymongo_test["$cmd"].find(command.copy())
        self.assertEqual(command, cursor._Cursor__query_spec())

        # Other commands shouldn't be changed
        for cmd in ("drop", "create", "any-future-cmd"):
            command = SON([(cmd, 1)])
            cursor = c.pymongo_test["$cmd"].find(command.copy())
            self.assertEqual(command, cursor._Cursor__query_spec())
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:40,代码来源:test_read_preferences.py


示例17: test_authenticate_and_request

    def test_authenticate_and_request(self):
        # Database.authenticate() needs to be in a request - check that it
        # always runs in a request, and that it restores the request state
        # (in or not in a request) properly when it's finished.
        self.assertFalse(self.client.auto_start_request)
        db = self.client.pymongo_test
        auth_context.client.pymongo_test.add_user(
            "mike", "password",
            roles=["userAdmin", "dbAdmin", "readWrite"])
        try:
            self.assertFalse(self.client.in_request())
            self.assertTrue(db.authenticate("mike", "password"))
            self.assertFalse(self.client.in_request())

            request_cx = get_client(auto_start_request=True)
            request_db = request_cx.pymongo_test
            self.assertTrue(request_db.authenticate("mike", "password"))
            self.assertTrue(request_cx.in_request())
        finally:
            db.authenticate("mike", "password")
            db.remove_user("mike")
            db.logout()
            request_db.logout()
开发者ID:arunbaabte,项目名称:DjangoAppService,代码行数:23,代码来源:test_auth.py


示例18: get_client

 def get_client(self, *args, **kwargs):
     opts = kwargs.copy()
     opts['use_greenlets'] = self.use_greenlets
     return get_client(*args, **opts)
开发者ID:xowenx,项目名称:mongo-python-driver,代码行数:4,代码来源:test_pooling_base.py


示例19: _test_pool

    def _test_pool(self, use_request):
        """
        Test that the connection pool prevents both threads and greenlets from
        using a socket at the same time.

        Sequence:
        gr0: start a slow find()
        gr1: start a fast find()
        gr1: get results
        gr0: get results
        """
        cx = get_client(
            use_greenlets=self.use_greenlets,
            auto_start_request=False
        )

        db = cx.pymongo_test
        db.test.remove()
        db.test.insert({'_id': 1})

        history = []

        def find_fast():
            if use_request:
                cx.start_request()

            history.append('find_fast start')

            # With greenlets and the old connection._Pool, this would throw
            # AssertionError: "This event is already used by another
            # greenlet"
            self.assertEqual({'_id': 1}, db.test.find_one())
            history.append('find_fast done')

            if use_request:
                cx.end_request()

        def find_slow():
            if use_request:
                cx.start_request()

            history.append('find_slow start')

            # Javascript function that pauses N seconds per document
            fn = delay(10)
            if (is_mongos(db.connection) or not
                version.at_least(db.connection, (1, 7, 2))):
                # mongos doesn't support eval so we have to use $where
                # which is less reliable in this context.
                self.assertEqual(1, db.test.find({"$where": fn}).count())
            else:
                # 'nolock' allows find_fast to start and finish while we're
                # waiting for this to complete.
                self.assertEqual({'ok': 1.0, 'retval': True},
                                 db.command('eval', fn, nolock=True))

            history.append('find_slow done')

            if use_request:
                cx.end_request()

        if self.use_greenlets:
            gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
            gr0.start()
            gr1.start_later(.1)
        else:
            gr0 = threading.Thread(target=find_slow)
            gr0.setDaemon(True)
            gr1 = threading.Thread(target=find_fast)
            gr1.setDaemon(True)

            gr0.start()
            time.sleep(.1)
            gr1.start()

        gr0.join()
        gr1.join()

        self.assertEqual([
            'find_slow start',
            'find_fast start',
            'find_fast done',
            'find_slow done',
        ], history)
开发者ID:xowenx,项目名称:mongo-python-driver,代码行数:84,代码来源:test_pooling_base.py


示例20: test_mongos_connection

    def test_mongos_connection(self):
        c = get_client()
        is_mongos = utils.is_mongos(c)

        # Test default mode, PRIMARY
        cursor = c.pymongo_test.test.find()
        if is_mongos:
            # We only set $readPreference if it's something other than
            # PRIMARY to avoid problems with mongos versions that don't
            # support read preferences.
            self.assertEqual(
                None,
                cursor._Cursor__query_spec().get('$readPreference')
            )
        else:
            self.assertFalse(
                '$readPreference' in cursor._Cursor__query_spec())

        # Copy these constants for brevity
        PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
        SECONDARY = ReadPreference.SECONDARY
        SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
        NEAREST = ReadPreference.NEAREST
        SLAVE_OKAY = _QUERY_OPTIONS['slave_okay']

        # Test non-PRIMARY modes which can be combined with tags
        for kwarg, value, mongos_mode in (
            ('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'),
            ('read_preference', SECONDARY, 'secondary'),
            ('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'),
            ('read_preference', NEAREST, 'nearest'),
            ('slave_okay', True, 'secondaryPreferred'),
            ('slave_okay', False, 'primary')
        ):
            for tag_sets in (
                None, [{}]
            ):
                # Create a client e.g. with read_preference=NEAREST or
                # slave_okay=True
                c = get_client(tag_sets=tag_sets, **{kwarg: value})

                self.assertEqual(is_mongos, c.is_mongos)
                cursor = c.pymongo_test.test.find()
                if is_mongos:
                    # We don't set $readPreference for SECONDARY_PREFERRED
                    # unless tags are in use. slaveOkay has the same effect.
                    if mongos_mode == 'secondaryPreferred':
                        self.assertEqual(
                            None,
                            cursor._Cursor__query_spec().get('$readPreference'))

                        self.assertTrue(
                            cursor._Cursor__query_options() & SLAVE_OKAY)

                    # Don't send $readPreference for PRIMARY either
                    elif mongos_mode == 'primary':
                        self.assertEqual(
                            None,
                            cursor._Cursor__query_spec().get('$readPreference'))

                        self.assertFalse(
                            cursor._Cursor__query_options() & SLAVE_OKAY)
                    else:
                        self.assertEqual(
                            {'mode': mongos_mode},
                            cursor._Cursor__query_spec().get('$readPreference'))

                        self.assertTrue(
                            cursor._Cursor__query_options() & SLAVE_OKAY)
                else:
                    self.assertFalse(
                        '$readPreference' in cursor._Cursor__query_spec())

            for tag_sets in (
                [{'dc': 'la'}],
                [{'dc': 'la'}, {'dc': 'sf'}],
                [{'dc': 'la'}, {'dc': 'sf'}, {}],
            ):
                if kwarg == 'slave_okay':
                    # Can't use tags with slave_okay True or False, need a
                    # real read preference
                    self.assertRaises(
                        ConfigurationError,
                        get_client, tag_sets=tag_sets, **{kwarg: value})

                    continue

                c = get_client(tag_sets=tag_sets, **{kwarg: value})

                self.assertEqual(is_mongos, c.is_mongos)
                cursor = c.pymongo_test.test.find()
                if is_mongos:
                    self.assertEqual(
                        {'mode': mongos_mode, 'tags': tag_sets},
                        cursor._Cursor__query_spec().get('$readPreference'))
                else:
                    self.assertFalse(
                        '$readPreference' in cursor._Cursor__query_spec())
开发者ID:easonlin,项目名称:mongo-python-driver,代码行数:98,代码来源:test_read_preferences.py



注:本文中的test.test_client.get_client函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_connection.get_connection函数代码示例发布时间:2022-05-27
下一篇:
Python utils.run_briefly函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap