本文整理汇总了Python中pymongo.uri_parser.parse_uri函数的典型用法代码示例。如果您正苦于以下问题:Python parse_uri函数的具体用法?Python parse_uri怎么用?Python parse_uri使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_uri函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, db_config):
self.config = db_config
self.force_quit = False
# sanitize the options
if self.config["target_collections"] is not None:
self.config["target_collections"] = set(
[coll.strip() for coll in self.config["target_collections"]])
if 'auto_config' in self.config and self.config['auto_config'] is True:
if 'auth_db' not in self.config['auto_config_options']:
try:
self.config['auto_config_options']['auth_db'] = self.config['auth_db']
except Exception:
pass
if 'user' not in self.config['auto_config_options']:
try:
self.config['auto_config_options']['user'] = self.config['user']
except Exception:
pass
if 'password' not in self.config['auto_config_options']:
try:
self.config['auto_config_options']['password'] = self.config['password']
except Exception:
pass
self.get_topology(self.config['auto_config_options'])
oplog_servers = self.build_oplog_servers(self.config['auto_config_options'])
profiler_servers = self.build_profiler_servers(self.config['auto_config_options'])
else:
oplog_servers = self.config["oplog_servers"]
profiler_servers = self.config["profiler_servers"]
if len(oplog_servers) < 1 or len(profiler_servers) < 1:
utils.log.error("Detected either no profile or oplog servers, bailing")
sys.exit(1)
self.oplog_clients = {}
for index, server in enumerate(oplog_servers):
mongodb_uri = server['mongodb_uri']
nodelist = uri_parser.parse_uri(mongodb_uri)["nodelist"]
server_string = "%s:%s" % (nodelist[0][0], nodelist[0][1])
self.oplog_clients[server_string] = self.connect_mongo(server)
utils.LOG.info("oplog server %d: %s", index, self.sanatize_server(server))
# create a mongo client for each profiler server
self.profiler_clients = {}
for index, server in enumerate(profiler_servers):
mongodb_uri = server['mongodb_uri']
nodelist = uri_parser.parse_uri(mongodb_uri)["nodelist"]
server_string = "%s:%s" % (nodelist[0][0], nodelist[0][1])
self.profiler_clients[server_string] = self.connect_mongo(server)
utils.LOG.info("profiling server %d: %s", index, self.sanatize_server(server))
开发者ID:nderzhak,项目名称:flashback,代码行数:53,代码来源:record.py
示例2: run_test
def run_test(self):
if not _HAVE_DNSPYTHON:
raise unittest.SkipTest("DNS tests require the dnspython module")
uri = test_case['uri']
seeds = test_case['seeds']
hosts = test_case['hosts']
options = test_case.get('options')
if seeds:
seeds = split_hosts(','.join(seeds))
if hosts:
hosts = frozenset(split_hosts(','.join(hosts)))
if options:
for key, value in options.items():
# Convert numbers / booleans to strings for comparison
if isinstance(value, bool):
options[key] = 'true' if value else 'false'
elif isinstance(value, (int, float)):
options[key] = str(value)
if seeds:
result = parse_uri(uri, validate=False)
self.assertEqual(sorted(result['nodelist']), sorted(seeds))
if options:
opts = result['options']
if 'readpreferencetags' in opts:
rpts = validate_read_preference_tags(
'readPreferenceTags', opts.pop('readpreferencetags'))
opts['readPreferenceTags'] = rpts
self.assertEqual(result['options'], options)
hostname = next(iter(client_context.client.nodes))[0]
# The replica set members must be configured as 'localhost'.
if hostname == 'localhost':
copts = client_context.default_client_options.copy()
if client_context.ssl is True:
# Our test certs don't support the SRV hosts used in these tests.
copts['ssl_match_hostname'] = False
client = MongoClient(uri, **copts)
# Force server selection
client.admin.command('ismaster')
wait_until(
lambda: hosts == client.nodes,
'match test hosts to client nodes')
else:
try:
parse_uri(uri)
except (ConfigurationError, ValueError):
pass
else:
self.fail("failed to raise an exception")
开发者ID:ShaneHarvey,项目名称:mongo-python-driver,代码行数:51,代码来源:test_dns.py
示例3: __init__
def __init__(self, db_config):
self.config = db_config
self.force_quit = False
# Sanitize the config options
self.config["target_collections"] = set(
[coll.strip() for coll in self.config.get("target_collections", [])])
if self.config.get('auto_config'):
if 'auto_config_options' not in self.config:
self.config['auto_config_options'] = {}
if 'auth_db' not in self.config['auto_config_options'] and 'auth_db' in self.config:
self.config['auto_config_options']['auth_db'] = self.config['auth_db']
if 'user' not in self.config['auto_config_options'] and 'user' in self.config:
self.config['auto_config_options']['user'] = self.config['user']
if 'password' not in self.config['auto_config_options'] and 'password' in self.config:
self.config['auto_config_options']['password'] = self.config['password']
self.get_topology(self.config['auto_config_options'])
oplog_servers = self.build_oplog_servers(self.config['auto_config_options'])
profiler_servers = self.build_profiler_servers(self.config['auto_config_options'])
else:
oplog_servers = self.config["oplog_servers"]
profiler_servers = self.config["profiler_servers"]
if len(oplog_servers) < 1 or len(profiler_servers) < 1:
utils.log.error("Detected either no profile or oplog servers, bailing")
sys.exit(1)
# Connect to each MongoDB server that we want to get the oplog data from
self.oplog_clients = {}
for index, server in enumerate(oplog_servers):
mongodb_uri = server['mongodb_uri']
nodelist = uri_parser.parse_uri(mongodb_uri)["nodelist"]
server_string = "%s:%s" % (nodelist[0][0], nodelist[0][1])
self.oplog_clients[server_string] = self.connect_mongo(server)
utils.LOG.info("oplog server %d: %s", index, self.sanitize_server(server))
# Connect to each MongoDB server that we want to get the profile data from
self.profiler_clients = {}
for index, server in enumerate(profiler_servers):
mongodb_uri = server['mongodb_uri']
nodelist = uri_parser.parse_uri(mongodb_uri)["nodelist"]
server_string = "%s:%s" % (nodelist[0][0], nodelist[0][1])
self.profiler_clients[server_string] = self.connect_mongo(server)
utils.LOG.info("profiling server %d: %s", index, self.sanitize_server(server))
utils.LOG.info('Successfully connected to %d oplog server(s) and %d profiler server(s)', len(self.oplog_clients), len(self.profiler_clients))
开发者ID:aayjaa,项目名称:flashback,代码行数:49,代码来源:record.py
示例4: init_db
def init_db(mongo_uri):
global _conn
global _db
dbname = uri_parser.parse_uri(mongo_uri)["database"]
_conn = MongoClient(host=mongo_uri)
_conn.register([model.Instance])
_db = _conn[dbname]
开发者ID:leloulight,项目名称:nova-dynamips-driver,代码行数:7,代码来源:db.py
示例5: get_mongo_uri
def get_mongo_uri(dasconfig):
""" read dasconfig and return mongodb host and port (as dict) """
# use debug=False to suppress printouts about DAS config
uri = dasconfig['mongodb']['dburi'][0]
parsed_uri = parse_uri(uri)
host, port = parsed_uri['nodelist'][0]
return dict(mongo_host=host, mongo_port=port)
开发者ID:dmwm,项目名称:DAS,代码行数:7,代码来源:config_reader.py
示例6: __init__
def __init__(self, location, params, mongodb=None):
"""
@type mongodb: инстанс подключения к монго
"""
self.collection_name = params['collection']
options = params.get('OPTIONS', {})
self.write_concern = options.get('WRITE_CONCERN', 1)
self.json_serializeable_values = options.get(
'VALUES_ARE_JSON_SERIALIZEABLE', True)
strategy = options.get('STRATEGY', 'NEAREST')
assert self.write_concern > -1
assert isinstance(self.collection_name, basestring)
if not location.startswith('mongodb://'):
raise ImproperlyConfigured('connection to mongo should start with mongodb://')
database = uri_parser.parse_uri(location)['database']
if not database:
raise ImproperlyConfigured('Specify DB like that mongodb://hosts/database_name')
self.mongodb = mongodb or MongoDBWrapper(
hosts=location,
strategy=strategy,
replica_set=options['replica_set'],
database_name=database
)
self.logger = logging.getLogger('mongo_requests')
super(MongoDBCache, self).__init__(params)
self._ensure_ttl_collection()
开发者ID:antonyc,项目名称:mongodb_cache,代码行数:27,代码来源:cache.py
示例7: test_parse_uri_unicode
def test_parse_uri_unicode(self):
# Ensure parsing a unicode returns option names that can be passed
# as kwargs. In Python 2.4, keyword argument names must be ASCII.
# In all Pythons, str is the type of valid keyword arg names.
res = parse_uri(unicode("mongodb://localhost/?fsync=true"))
for key in res['options']:
self.assertTrue(isinstance(key, str))
开发者ID:hedgepigdaniel,项目名称:mongo-python-driver,代码行数:7,代码来源:test_uri_parser.py
示例8: __init__
def __init__(self, uri="mongodb://127.0.0.1:27017", pool_size=1, ssl_context_factory=None, **kwargs):
assert isinstance(uri, basestring)
assert isinstance(pool_size, int)
assert pool_size >= 1
if not uri.startswith("mongodb://"):
uri = "mongodb://" + uri
self.__uri = parse_uri(uri)
wc_options = self.__uri['options'].copy()
wc_options.update(kwargs)
wc_options = dict((k, v) for k, v in wc_options.items() if k in self.__wc_possible_options)
self.__write_concern = WriteConcern(**wc_options)
self.__pool_size = pool_size
self.__pool = [_Connection(self, self.__uri, i) for i in range(pool_size)]
if self.__uri['database'] and self.__uri['username'] and self.__uri['password']:
self.authenticate(self.__uri['database'], self.__uri['username'],
self.__uri['password'],
self.__uri['options'].get('authmechanism', 'DEFAULT'))
host, port = self.__uri['nodelist'][0]
for factory in self.__pool:
if ssl_context_factory:
factory.connector = reactor.connectSSL(host, port, factory, ssl_context_factory)
else:
factory.connector = reactor.connectTCP(host, port, factory)
开发者ID:gaochunzy,项目名称:txmongo,代码行数:30,代码来源:connection.py
示例9: get_collection
def get_collection(uri):
uri_info = uri_parser.parse_uri(uri)
col = uri_info["collection"]
database = get_database(uri)
return database[col]
开发者ID:dcrosta,项目名称:mongo-disco,代码行数:7,代码来源:mongo_util.py
示例10: __init__
def __init__(self, db_url, collection_name):
client = MongoClient(db_url)
parsed = uri_parser.parse_uri(db_url)
if 'database' not in parsed:
raise ValueError('database not in uri: {}', db_url)
db = client[parsed['database']]
self.c = db[collection_name]
开发者ID:penmark,项目名称:ingest,代码行数:7,代码来源:mongo.py
示例11: __init__
def __init__(self, db_config):
self.config = db_config
self.force_quit = False
# sanitize the options
if self.config["target_collections"] is not None:
self.config["target_collections"] = set(
[coll.strip() for coll in self.config["target_collections"]])
oplog_server = self.config["oplog_server"]
profiler_servers = self.config["profiler_servers"]
mongodb_uri = oplog_server["mongodb_uri"]
self.oplog_client = MongoClient(mongodb_uri)
# create a mongo client for each profiler server
self.profiler_clients = {}
for index, server in enumerate(profiler_servers):
mongodb_uri = server['mongodb_uri']
nodelist = uri_parser.parse_uri(mongodb_uri)["nodelist"]
server_string = "%s:%s" % (nodelist[0][0], nodelist[0][1])
self.profiler_clients[server_string] = MongoClient(mongodb_uri,
slaveOk=True)
utils.LOG.info("profiling server %d: %s", index, str(server))
utils.LOG.info("oplog server: %s", str(oplog_server))
开发者ID:llvtt,项目名称:flashback,代码行数:25,代码来源:record.py
示例12: _get_db
def _get_db():
"""
Returns the connection to the database using the settings.
This function should not be called outside of this file.
Use db instead.
"""
from .settings import settings
mongo = settings.MONGODB
if 'URI' in mongo and mongo['URI']:
uri = mongo['URI']
else:
uri = 'mongodb://'
if all(mongo.get(key) for key in ('USERNAME', 'PASSWORD')):
uri += '{0}:{1}@'.format(mongo['USERNAME'], mongo['PASSWORD'])
if 'HOSTS' in mongo and mongo['HOSTS']:
uri += ','.join(
'{0}:{1}'.format(host, port)
for (host, port) in zip(mongo['HOSTS'], mongo['PORTS']),
)
else:
uri += '{0}:{1}'.format(mongo['HOST'], mongo.get('PORT', 27017))
uri += '/' + mongo['DATABASE']
if 'OPTIONS' in mongo and mongo['OPTIONS']:
uri += '?{0}'.format('&'.join(mongo['OPTIONS']))
return MongoClient(uri)[parse_uri(uri)['database']]
开发者ID:mathvaleriano,项目名称:mongorest,代码行数:31,代码来源:database.py
示例13: _create_connection
def _create_connection(conn_settings, testing=False):
# Handle multiple connections recursively
if isinstance(conn_settings, list):
connections = {}
for conn in conn_settings:
connections[conn.get('alias')] = _create_connection(conn, testing)
return connections
# Ugly dict comprehention in order to support python 2.6
conn = dict((k.lower(), v) for k, v in conn_settings.items() if v is not None)
if 'replicaset' in conn:
conn['replicaSet'] = conn.pop('replicaset')
if (StrictVersion(mongoengine.__version__) >= StrictVersion('0.10.6') and
testing and conn.get('host', '').startswith('mongomock://')):
pass
# Handle uri style connections
elif "://" in conn.get('host', ''):
uri_dict = uri_parser.parse_uri(conn['host'])
conn['db'] = uri_dict['database']
if conn['db'] is None:
raise ValueError('Mongo host URI must contain database name')
return mongoengine.connect(conn.pop('db', 'test'), **conn)
开发者ID:ntcong,项目名称:flask-mongoengine,代码行数:26,代码来源:__init__.py
示例14: connect
def connect(mongodb_uri, alias=DEFAULT_CONNECTION_ALIAS):
"""Register a connection to MongoDB, optionally providing a name for it.
:parameters:
- `mongodb_uri`: A MongoDB connection string. Any options may be passed
within the string that are supported by PyMongo. `mongodb_uri` must
specify a database, which will be used by any
:class:`~pymodm.MongoModel` that uses this connection.
- `alias`: An optional name for this connection, backed by a
:class:`~pymongo.mongo_client.MongoClient` instance that is cached under
this name. You can specify what connection a MongoModel uses by
specifying the connection's alias via the `connection_alias` attribute
inside their `Meta` class. Switching connections is also possible using
the :class:`~pymodm.context_managers.switch_connection` context
manager. Note that calling `connect()` multiple times with the same
alias will replace any previous connections.
"""
# Make sure the database is provided.
parsed_uri = uri_parser.parse_uri(mongodb_uri)
if not parsed_uri.get('database'):
raise ValueError('Connection must specify a database.')
_CONNECTIONS[alias] = ConnectionInfo(
parsed_uri=parsed_uri,
conn_string=mongodb_uri,
database=MongoClient(mongodb_uri)[parsed_uri['database']])
开发者ID:gitter-badger,项目名称:pymodm,代码行数:26,代码来源:connection.py
示例15: collect
def collect(self):
server = self.config.get('server')
if server == None:
self.error("Missing 'server' in mongo config")
return
parsed = uri_parser.parse_uri(server)
username = parsed.get('username')
password = parsed.get('password')
db_name = parsed.get('database')
slowms = self.config.get('slowms', 25)
if not db_name:
self.log.info('No MongoDB database found in URI. Defaulting to admin.')
db_name = 'admin'
do_auth = True
if username is None or password is None:
self.log.debug("Mongo: cannot extract username and password from config %s" % server)
do_auth = False
try:
self.conn = pymongo.Connection(server, network_timeout=self.DEFAULT_TIMEOUT)
except Exception, e:
self.error(e)
return
开发者ID:amonapp,项目名称:amon-plugins-legacy,代码行数:30,代码来源:mongo.py
示例16: register_connection
def register_connection(alias, name=None, host=None, port=None,
read_preference=READ_PREFERENCE,
username=None, password=None, authentication_source=None,
**kwargs):
"""Add a connection.
:param alias: the name that will be used to refer to this connection
throughout MongoEngine
:param name: the name of the specific database to use
:param host: the host name of the :program:`mongod` instance to connect to
:param port: the port that the :program:`mongod` instance is running on
:param read_preference: The read preference for the collection
** Added pymongo 2.1
:param username: username to authenticate with
:param password: password to authenticate with
:param authentication_source: database to authenticate against
:param is_mock: explicitly use mongomock for this connection
(can also be done by using `mongomock://` as db host prefix)
:param kwargs: allow ad-hoc parameters to be passed into the pymongo driver
.. versionchanged:: 0.10.6 - added mongomock support
"""
global _connection_settings
conn_settings = {
'name': name or 'test',
'host': host or 'localhost',
'port': port or 27017,
'read_preference': read_preference,
'username': username,
'password': password,
'authentication_source': authentication_source
}
# Handle uri style connections
conn_host = conn_settings['host']
if conn_host.startswith('mongomock://'):
conn_settings['is_mock'] = True
# `mongomock://` is not a valid url prefix and must be replaced by `mongodb://`
conn_settings['host'] = conn_host.replace('mongomock://', 'mongodb://', 1)
elif '://' in conn_host:
uri_dict = uri_parser.parse_uri(conn_host)
conn_settings.update({
'name': uri_dict.get('database') or name,
'username': uri_dict.get('username'),
'password': uri_dict.get('password'),
'read_preference': read_preference,
})
uri_options = uri_dict['options']
if 'replicaset' in uri_options:
conn_settings['replicaSet'] = True
if 'authsource' in uri_options:
conn_settings['authentication_source'] = uri_options['authsource']
# Deprecated parameters that should not be passed on
kwargs.pop('slaves', None)
kwargs.pop('is_slave', None)
conn_settings.update(kwargs)
_connection_settings[alias] = conn_settings
开发者ID:vintozver,项目名称:mongoengine,代码行数:60,代码来源:connection.py
示例17: config_celery_for_mongo
def config_celery_for_mongo(settings):
db_uri = settings['mongodb.uri'].strip('"\'')
db_name = settings['celery.dbname'].strip('"\'')
res = uri_parser.parse_uri(db_uri)
host, port = res['nodelist'][0]
global _modules_to_register
global _celery_routes
#print 'collected tasks'
#print _celery_routes.keys()
celery_config = {
'CELERY_RESULT_BACKEND' : 'mongodb',
'BROKER_TRANSPORT' : 'mongodb',
'CELERY_IMPORTS': tuple(_modules_to_register),
'BROKER_HOST' : host,
'BROKER_PORT' : port,
'BROKER_VHOST' : db_name,
'CELERY_MONGODB_BACKEND_SETTINGS' : {
'host': host,
'port': port,
'database': db_name
},
'CELERY_DISABLE_RATE_LIMITS': True,
'CELERY_ROUTES': _celery_routes,
'CELERYD_POOL': 'gevent',
}
return celery_config
开发者ID:xflash96,项目名称:pyramid_celery,代码行数:27,代码来源:__init__.py
示例18: _initialize_
def _initialize_(self, do_connect):
super(Mongo, self)._initialize_(do_connect)
#: uri parse
from pymongo import uri_parser
m = uri_parser.parse_uri(self.uri)
if isinstance(m, tuple):
m = {"database": m[1]}
if m.get('database') is None:
raise SyntaxError("Database is required!")
self._driver_db = m['database']
#: mongodb imports and utils
from bson.objectid import ObjectId
from bson.son import SON
from pymongo.write_concern import WriteConcern
self.epoch = datetime.fromtimestamp(0)
self.SON = SON
self.ObjectId = ObjectId
self.WriteConcern = WriteConcern
#: options
self.db_codec = 'UTF-8'
# this is the minimum amount of replicates that it should wait
# for on insert/update
self.minimumreplication = self.adapter_args.get(
'minimumreplication', 0)
# by default all inserts and selects are performed asynchronous,
# but now the default is
# synchronous, except when overruled by either this default or
# function parameter
self.safe = 1 if self.adapter_args.get('safe', True) else 0
self._mock_reconnect()
开发者ID:web2py,项目名称:pydal,代码行数:30,代码来源:mongo.py
示例19: register_connection
def register_connection(
alias,
name,
host="localhost",
port=27017,
is_slave=False,
read_preference=False,
slaves=None,
username=None,
password=None,
**kwargs
):
"""Add a connection.
:param alias: the name that will be used to refer to this connection
throughout MongoEngine
:param name: the name of the specific database to use
:param host: the host name of the :program:`mongod` instance to connect to
:param port: the port that the :program:`mongod` instance is running on
:param is_slave: whether the connection can act as a slave
** Depreciated pymongo 2.0.1+
:param read_preference: The read preference for the collection
** Added pymongo 2.1
:param slaves: a list of aliases of slave connections; each of these must
be a registered connection that has :attr:`is_slave` set to ``True``
:param username: username to authenticate with
:param password: password to authenticate with
:param kwargs: allow ad-hoc parameters to be passed into the pymongo driver
"""
global _connection_settings
conn_settings = {
"name": name,
"host": host,
"port": port,
"is_slave": is_slave,
"slaves": slaves or [],
"username": username,
"password": password,
"read_preference": read_preference,
}
# Handle uri style connections
if "://" in host:
uri_dict = uri_parser.parse_uri(host)
conn_settings.update(
{
"host": host,
"name": uri_dict.get("database") or name,
"username": uri_dict.get("username"),
"password": uri_dict.get("password"),
"read_preference": read_preference,
}
)
if "replicaSet" in host:
conn_settings["replicaSet"] = True
conn_settings.update(kwargs)
_connection_settings[alias] = conn_settings
开发者ID:mitar,项目名称:mongoengine,代码行数:60,代码来源:connection.py
示例20: _create_connection
def _create_connection(conn_settings):
# Handle multiple connections recursively
if isinstance(conn_settings, list):
connections = {}
for conn in conn_settings:
connections[conn.get('alias')] = _create_connection(conn)
return connections
# Ugly dict comprehention in order to support python 2.6
conn = dict((k.lower(), v) for k, v in conn_settings.items() if v is not None)
if 'replicaset' in conn:
conn['replicaSet'] = conn.pop('replicaset')
if (StrictVersion(mongoengine.__version__) >= StrictVersion('0.10.6') and
current_app.config['TESTING'] == True and
conn.get('host', '').startswith('mongomock://')):
pass
# Handle uri style connections
elif "://" in conn.get('host', ''):
uri_dict = uri_parser.parse_uri(conn['host'])
conn['db'] = uri_dict['database']
return mongoengine.connect(conn.pop('db', 'test'), **conn)
开发者ID:Nobatek,项目名称:flask-mongoengine,代码行数:25,代码来源:__init__.py
注:本文中的pymongo.uri_parser.parse_uri函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论