本文整理汇总了Python中pymongo.mongo_client.MongoClient类的典型用法代码示例。如果您正苦于以下问题:Python MongoClient类的具体用法?Python MongoClient怎么用?Python MongoClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MongoClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_init_disconnected
def test_init_disconnected(self):
c = MongoClient(host, port, _connect=False)
ctx = catch_warnings()
try:
warnings.simplefilter("ignore", DeprecationWarning)
self.assertIsInstance(c.is_primary, bool)
self.assertIsInstance(c.is_mongos, bool)
self.assertIsInstance(c.max_pool_size, int)
self.assertIsInstance(c.use_greenlets, bool)
self.assertIsInstance(c.nodes, frozenset)
self.assertIsInstance(c.auto_start_request, bool)
self.assertEqual(dict, c.get_document_class())
self.assertIsInstance(c.tz_aware, bool)
self.assertIsInstance(c.max_bson_size, int)
self.assertIsInstance(c.min_wire_version, int)
self.assertIsInstance(c.max_wire_version, int)
self.assertIsInstance(c.max_write_batch_size, int)
self.assertEqual(None, c.host)
self.assertEqual(None, c.port)
finally:
ctx.exit()
c.pymongo_test.test.find_one() # Auto-connect.
self.assertEqual((host, port), c.address)
if version.at_least(c, (2, 5, 4, -1)):
self.assertTrue(c.max_wire_version > 0)
else:
self.assertEqual(c.max_wire_version, 0)
self.assertTrue(c.min_wire_version >= 0)
bad_host = "somedomainthatdoesntexist.org"
c = MongoClient(bad_host, port, connectTimeoutMS=1, _connect=False)
self.assertRaises(ConnectionFailure, c.pymongo_test.test.find_one)
开发者ID:hedgepigdaniel,项目名称:mongo-python-driver,代码行数:35,代码来源:test_client.py
示例2: test_unix_socket
def test_unix_socket(self):
if not hasattr(socket, "AF_UNIX"):
raise SkipTest("UNIX-sockets are not supported on this system")
mongodb_socket = '/tmp/mongodb-27017.sock'
encoded_socket = '%2Ftmp%2Fmongodb-27017.sock'
if not os.access(mongodb_socket, os.R_OK):
raise SkipTest("Socket file is not accessible")
if client_context.auth_enabled:
uri = "mongodb://%s:%[email protected]%s" % (db_user, db_pwd, encoded_socket)
else:
uri = "mongodb://%s" % encoded_socket
# Confirm we can do operations via the socket.
client = MongoClient(uri)
client.pymongo_test.test.insert_one({"dummy": "object"})
dbs = client.database_names()
self.assertTrue("pymongo_test" in dbs)
# Confirm it fails with a missing socket.
self.assertRaises(
ConnectionFailure,
connected, MongoClient("mongodb://%2Ftmp%2Fnon-existent.sock",
serverSelectionTimeoutMS=100))
开发者ID:gregbanks,项目名称:mongo-python-driver,代码行数:25,代码来源:test_client.py
示例3: test_ipv6
def test_ipv6(self):
c = MongoClient("mongodb://[::1]:%d" % (port,), replicaSet=self.name)
# Client switches to IPv4 once it has first ismaster response.
msg = 'discovered primary with IPv4 address "%r"' % (self.primary,)
wait_until(lambda: c.primary == self.primary, msg)
# Same outcome with both IPv4 and IPv6 seeds.
c = MongoClient("[::1]:%d,localhost:%d" % (port, port),
replicaSet=self.name)
wait_until(lambda: c.primary == self.primary, msg)
if client_context.auth_enabled:
auth_str = "%s:%[email protected]" % (db_user, db_pwd)
else:
auth_str = ""
uri = "mongodb://%slocalhost:%d,[::1]:%d" % (auth_str, port, port)
client = MongoClient(uri, replicaSet=self.name)
client.pymongo_test.test.insert_one({"dummy": u("object")})
client.pymongo_test_bernie.test.insert_one({"dummy": u("object")})
dbs = client.database_names()
self.assertTrue("pymongo_test" in dbs)
self.assertTrue("pymongo_test_bernie" in dbs)
client.close()
开发者ID:BiosPsucheZoe,项目名称:mongo-python-driver,代码行数:27,代码来源:test_replica_set_client.py
示例4: test_max_idle_time_checkout
def test_max_idle_time_checkout(self):
# Use high frequency to test _get_socket_no_auth.
with client_knobs(kill_cursor_frequency=99999999):
client = MongoClient(host, port, maxIdleTimeMS=.5)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
self.assertEqual(1, len(server._pool.sockets))
time.sleep(1) # Sleep so that the socket becomes stale.
with server._pool.get_socket({}) as new_sock_info:
self.assertNotEqual(sock_info, new_sock_info)
self.assertEqual(1, len(server._pool.sockets))
self.assertFalse(sock_info in server._pool.sockets)
self.assertTrue(new_sock_info in server._pool.sockets)
# Test that sockets are reused if maxIdleTimeMS is not set.
client = MongoClient(host, port)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
self.assertEqual(1, len(server._pool.sockets))
time.sleep(1)
with server._pool.get_socket({}) as new_sock_info:
self.assertEqual(sock_info, new_sock_info)
self.assertEqual(1, len(server._pool.sockets))
开发者ID:HermogenesBatista,项目名称:mongo-python-driver,代码行数:26,代码来源:test_client.py
示例5: stash
def stash(results):
"""
暂存到mongo数据库中。
"""
summary = {}
mongo = MongoClient(**config.mongo)
try:
for item_model, objs in results:
collection_name = item_model['name']
db = mongo.get_database('theforce')
collection = db.get_collection(collection_name)
collection.insert_many(objs)
summary[collection_name] = len(
objs) if collection_name not in summary else len(
objs) + summary[collection_name]
print
print "=" * 40
print ' ' * 15, u'Stash'
print "=" * 40
print
print u"数据已成功保存到MongoDB的theforce库中,其中新增数据:"
for name, length in summary.items():
print name, length
finally:
mongo.close()
开发者ID:gxsdfzmck,项目名称:theforce,代码行数:26,代码来源:main.py
示例6: test_unix_socket
def test_unix_socket(self):
if not hasattr(socket, "AF_UNIX"):
raise SkipTest("UNIX-sockets are not supported on this system")
client = MongoClient(host, port)
if (sys.platform == 'darwin' and
server_started_with_auth(client) and
not version.at_least(client, (2, 7, 1))):
raise SkipTest("SERVER-8492")
mongodb_socket = '/tmp/mongodb-27017.sock'
if not os.access(mongodb_socket, os.R_OK):
raise SkipTest("Socket file is not accessable")
self.assertTrue(MongoClient("mongodb://%s" % mongodb_socket))
client = MongoClient("mongodb://%s" % mongodb_socket)
client.pymongo_test.test.save({"dummy": "object"})
# Confirm we can read via the socket
dbs = client.database_names()
self.assertTrue("pymongo_test" in dbs)
# Confirm it fails with a missing socket
self.assertRaises(ConnectionFailure, MongoClient,
"mongodb:///tmp/none-existent.sock")
开发者ID:hedgepigdaniel,项目名称:mongo-python-driver,代码行数:25,代码来源:test_client.py
示例7: test_document_class
def test_document_class(self):
c = MongoClient(host, port)
db = c.pymongo_test
db.test.insert({"x": 1})
self.assertEqual(dict, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), dict))
self.assertFalse(isinstance(db.test.find_one(), SON))
c.document_class = SON
self.assertEqual(SON, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), SON))
self.assertFalse(isinstance(db.test.find_one(as_class=dict), SON))
c = MongoClient(host, port, document_class=SON)
db = c.pymongo_test
self.assertEqual(SON, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), SON))
self.assertFalse(isinstance(db.test.find_one(as_class=dict), SON))
c.document_class = dict
self.assertEqual(dict, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), dict))
self.assertFalse(isinstance(db.test.find_one(), SON))
开发者ID:quantopian,项目名称:mongo-python-driver,代码行数:27,代码来源:test_client.py
示例8: test_document_class
def test_document_class(self):
c = MongoClient(host, port)
db = c.pymongo_test
db.test.insert({"x": 1})
ctx = catch_warnings()
try:
warnings.simplefilter("ignore", DeprecationWarning)
self.assertEqual(dict, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), dict))
self.assertFalse(isinstance(db.test.find_one(), SON))
c.document_class = SON
db = c.pymongo_test
self.assertEqual(SON, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), SON))
self.assertFalse(isinstance(db.test.find_one(as_class=dict), SON))
c = MongoClient(host, port, document_class=SON)
db = c.pymongo_test
self.assertEqual(SON, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), SON))
self.assertFalse(isinstance(db.test.find_one(as_class=dict), SON))
c.document_class = dict
db = c.pymongo_test
self.assertEqual(dict, c.document_class)
self.assertTrue(isinstance(db.test.find_one(), dict))
self.assertFalse(isinstance(db.test.find_one(), SON))
finally:
ctx.exit()
开发者ID:hedgepigdaniel,项目名称:mongo-python-driver,代码行数:34,代码来源:test_client.py
示例9: __init__
def __init__(self):
connection = MongoClient(
settings['MONGO_SERVER'],
settings['MONGO_PORT']
)
db = connection.get_database(settings['MONGO_DB'])
self.collection = db[settings['MONGO_COLLECTION']]
开发者ID:arimbr,项目名称:job-scraper,代码行数:7,代码来源:pipelines.py
示例10: open_spider
def open_spider(self, spider):
self._build_unique_key()
if self._replica_set is not None:
self.connection = MongoReplicaSetClient(
self._uri,
replicaSet=self._replica_set,
w=self._write_concern,
fsync=self._fsync,
read_preference=ReadPreference.PRIMARY_PREFERRED)
else:
self.connection = MongoClient(
self._uri,
fsync=self._fsync,
read_preference=ReadPreference.PRIMARY)
self.database = self.connection[self._database]
self.collection = self.database[self._collection]
log.msg('Connected to MongoDB "%s", using "%s/%s"' %
self._uri, self._database, self._collection)
# ensure index
if self._unique_key:
log.msg('Creating index for key %s' % self._unique_key)
self.collection.ensure_index(self._unique_key.items(), unique=True, sparse=True)
开发者ID:nyov,项目名称:scrapyext,代码行数:26,代码来源:mongo.py
示例11: test_init_disconnected
def test_init_disconnected(self):
c = MongoClient(host, port, _connect=False)
self.assertIsInstance(c.is_primary, bool)
self.assertIsInstance(c.is_mongos, bool)
self.assertIsInstance(c.max_pool_size, int)
self.assertIsInstance(c.use_greenlets, bool)
self.assertIsInstance(c.nodes, frozenset)
self.assertIsInstance(c.auto_start_request, bool)
self.assertEqual(dict, c.get_document_class())
self.assertIsInstance(c.tz_aware, bool)
self.assertIsInstance(c.max_bson_size, int)
self.assertIsInstance(c.min_wire_version, int)
self.assertIsInstance(c.max_wire_version, int)
self.assertEqual(None, c.host)
self.assertEqual(None, c.port)
c.pymongo_test.test.find_one() # Auto-connect.
self.assertEqual(host, c.host)
self.assertEqual(port, c.port)
if version.at_least(c, (2, 5, 4, -1)):
self.assertTrue(c.max_wire_version > 0)
else:
self.assertEqual(c.max_wire_version, 0)
self.assertTrue(c.min_wire_version >= 0)
bad_host = "somedomainthatdoesntexist.org"
c = MongoClient(bad_host, port, connectTimeoutMS=1, _connect=False)
self.assertRaises(ConnectionFailure, c.pymongo_test.test.find_one)
开发者ID:quantopian,项目名称:mongo-python-driver,代码行数:30,代码来源:test_client.py
示例12: test_read_with_failover
def test_read_with_failover(self):
c = MongoClient(
self.seed,
replicaSet=self.name,
serverSelectionTimeoutMS=self.server_selection_timeout)
wait_until(lambda: c.primary, "discover primary")
wait_until(lambda: len(c.secondaries) == 2, "discover secondaries")
def iter_cursor(cursor):
for _ in cursor:
pass
return True
w = len(c.secondaries) + 1
db = c.get_database("pymongo_test",
write_concern=WriteConcern(w=w))
db.test.delete_many({})
# Force replication
db.test.insert_many([{'foo': i} for i in xrange(10)])
self.assertEqual(10, db.test.count())
db.read_preference = SECONDARY_PREFERRED
cursor = db.test.find().batch_size(5)
next(cursor)
self.assertEqual(5, cursor._Cursor__retrieved)
self.assertTrue(cursor.address in c.secondaries)
ha_tools.kill_primary()
# Primary failure shouldn't interrupt the cursor
self.assertTrue(iter_cursor(cursor))
self.assertEqual(10, cursor._Cursor__retrieved)
开发者ID:BiosPsucheZoe,项目名称:mongo-python-driver,代码行数:30,代码来源:test_ha.py
示例13: mongo_text_test
def mongo_text_test():
cli = MongoClient()
test = cli.get_database("test").get_collection("test_search")
test.create_index(
[("super_type", ASCENDING), ("resource_state", ASCENDING),
("uuid", TEXT), ("name", TEXT), ("description", TEXT)],
background=True
)
docs = [{
"uuid": str(uuid.uuid4()),
"name": str(i),
"description": "Nature, \"time, and patience are "
"the three great -physicians.",
"super_type": "super_vol",
"type": "vol",
"create_time": int(
time.time()),
"resource_state": "inUse"
} for i in range(1, 11)]
test.insert_many(docs)
text = [" -physicians"]
print test.find({
"description": {
"$regex": "|".join([re.sub(
r"(\*|\.|\?|\+|\$|\^|\[|\]|\(|\)|\{|\}|\||\\|/)",
r"\\\1",
g
) for g in text]),
"$options": "i"
}
}).count()
开发者ID:woailuoli993,项目名称:ksxingtest,代码行数:32,代码来源:generate_fake_data.py
示例14: test_database_names
def test_database_names(self):
client = MongoClient(host, port)
client.pymongo_test.test.save({"dummy": u"object"})
client.pymongo_test_mike.test.save({"dummy": u"object"})
dbs = client.database_names()
self.assertTrue("pymongo_test" in dbs)
self.assertTrue("pymongo_test_mike" in dbs)
开发者ID:quantopian,项目名称:mongo-python-driver,代码行数:9,代码来源:test_client.py
示例15: doEverything
def doEverything():
# certfile = '/home/bryan/Downloads/baratheon.pem'
conn = MongoClient(url)
db = conn[database]
commands = []
collectionName = "pythonMongo"
commands.append("Creating collection " + collectionName)
collection = db[collectionName]
#insert 1
commands.append("# 1 Inserts")
commands.append("# 1.1 Insert a single document to a collection")
collection.insert({"name": "test1", "value": 1})
commands.append("Inserted {\"name\": \"test1\", \"value\": 1}")
#insert many
commands.append("#1.2 Inserting multiple entries into collection")
multiPost = [{"name": "test1", "value": 1},{"name": "test2", "value": 2}, {"name": "test3", "value": 3}]
collection.insert(multiPost)
commands.append("Inserted \n {\"name\": \"test1\", \"value\": 1} \n {\"name\": \"test2\", \"value\": 2} \n {\"name\": \"test3\", \"value\": 3}")
# Find
commands.append("#2 Queries")
commands.append("#2.1 Find one that matches a query condition")
commands.append(collection.find_one({"name": "test1"}))
# Find all
commands.append("#2.2 Find all that match a query condition")
for doc in collection.find({"name": "test1"}):
commands.append(doc)
# Display all documents
commands.append( "#2.3 Find all documents in collection")
for doc in collection.find():
commands.append(doc)
# update document
commands.append("#3 Updating Documents")
collection.update({"name": "test3"}, {"$set": { "value": 4}})
commands.append("Updated test3 with value 4")
# delete document
commands.append("#4 Delete Documents")
collection.remove({"name": "test2"})
commands.append("Deleted all with name test2")
# Display all collection names
commands.append("#5 Get a list of all of the collections")
commands.append( db.collection_names())
commands.append("#6 Drop a collection")
db.drop_collection(collectionName)
conn.close()
commands.append("Connection to database has been closed")
return commands
开发者ID:Bryan-Burkett,项目名称:informix,代码行数:56,代码来源:python_mongo_HelloWorld.py
示例16: test_alive
def test_alive(self):
ctx = catch_warnings()
try:
warnings.simplefilter("ignore", DeprecationWarning)
self.assertTrue(get_client().alive())
client = MongoClient('doesnt exist', _connect=False)
self.assertFalse(client.alive())
finally:
ctx.exit()
开发者ID:hedgepigdaniel,项目名称:mongo-python-driver,代码行数:10,代码来源:test_client.py
示例17: makeDBConnection
def makeDBConnection(port, **kwargs):
global _C
if not _C:
logging.info("establishing db connection at port %s ..." % port)
import pymongo
logging.info("using pymongo version %s" % pymongo.version)
from pymongo.mongo_client import MongoClient
_C = MongoClient(port = port, **kwargs)
mongo_info = _C.server_info()
logging.info("mongodb version: %s" % mongo_info["version"])
开发者ID:jwbober,项目名称:lmfdb,代码行数:10,代码来源:base.py
示例18: test_stale_getmore
def test_stale_getmore(self):
# A cursor is created, but its member goes down and is removed from
# the topology before the getMore message is sent. Test that
# MongoClient._send_message_with_response handles the error.
with self.assertRaises(AutoReconnect):
client = MongoClient(host, port, connect=False,
serverSelectionTimeoutMS=100,
replicaSet=client_context.replica_set_name)
client._send_message_with_response(
operation=message._GetMore('collection', 101, 1234),
address=('not-a-member', 27017))
开发者ID:big-data-datawarehouse,项目名称:mongo-python-driver,代码行数:11,代码来源:test_client.py
示例19: test_disconnect
def test_disconnect(self):
c = MongoClient(host, port)
coll = c.pymongo_test.bar
c.disconnect()
c.disconnect()
coll.count()
c.disconnect()
c.disconnect()
coll.count()
开发者ID:quantopian,项目名称:mongo-python-driver,代码行数:13,代码来源:test_client.py
示例20: test_mongo
def test_mongo(self):
client = MongoClient()
db = client.test_database
collection = db.test_collection
import datetime
post = {"author": "Mike",
"text": "My first blog post!",
"tags": ["mongodb", "python", "pymongo"],
"date": datetime.datetime.utcnow()}
posts = db.posts
post_id = posts.insert(post)
print(post_id)
print(client.server_info())
开发者ID:jagguli,项目名称:aioweb,代码行数:13,代码来源:mongodb_test.py
注:本文中的pymongo.mongo_client.MongoClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论