本文整理汇总了Python中raven.utils.urlparse.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, dsn=None, **options):
o = options
# configure loggers first
cls = self.__class__
self.state = ClientState()
self.logger = logging.getLogger('%s.%s' % (cls.__module__,
cls.__name__))
self.error_logger = logging.getLogger('sentry.errors')
if dsn is None and os.environ.get('SENTRY_DSN'):
msg = "Configuring Raven from environment variable 'SENTRY_DSN'"
self.logger.debug(msg)
dsn = os.environ['SENTRY_DSN']
if dsn:
# TODO: should we validate other options werent sent?
urlparts = urlparse(dsn)
msg = "Configuring Raven for host: %s://%s:%s" % (urlparts.scheme,
urlparts.netloc, urlparts.path)
self.logger.debug(msg)
dsn_config = raven.load(dsn, transport_registry=self._registry)
servers = dsn_config['SENTRY_SERVERS']
project = dsn_config['SENTRY_PROJECT']
public_key = dsn_config['SENTRY_PUBLIC_KEY']
secret_key = dsn_config['SENTRY_SECRET_KEY']
else:
servers = o.get('servers')
project = o.get('project')
public_key = o.get('public_key')
secret_key = o.get('secret_key')
self.servers = servers
self.public_key = public_key
self.secret_key = secret_key
self.project = project or defaults.PROJECT
self.include_paths = set(o.get('include_paths') or [])
self.exclude_paths = set(o.get('exclude_paths') or [])
self.name = unicode(o.get('name') or defaults.NAME)
self.auto_log_stacks = bool(o.get('auto_log_stacks') or
defaults.AUTO_LOG_STACKS)
self.string_max_length = int(o.get('string_max_length') or
defaults.MAX_LENGTH_STRING)
self.list_max_length = int(o.get('list_max_length') or defaults.MAX_LENGTH_LIST)
self.site = o.get('site', defaults.SITE)
self.processors = o.get('processors')
if self.processors is None:
self.processors = defaults.PROCESSORS
context = o.get('context')
if context is None:
context = {'sys.argv': sys.argv[:]}
self.extra = context
self.module_cache = ModuleProxyCache()
# servers may be set to a NoneType (for Django)
if not self.is_enabled():
self.logger.info('Raven is not configured (disabled). Please see documentation for more information.')
开发者ID:CS76,项目名称:ipo-galaxy,代码行数:60,代码来源:base.py
示例2: _get_public_dsn
def _get_public_dsn(self):
url = urlparse(self.servers[0])
netloc = url.hostname
if url.port:
netloc += ":%s" % url.port
path = url.path.replace("api/%s/store/" % (self.project,), self.project)
return "//%[email protected]%s%s" % (self.public_key, netloc, path)
开发者ID:htomika,项目名称:gaeFeedFind,代码行数:7,代码来源:base.py
示例3: load
def load(dsn, scope=None, transport_registry=None):
"""
Parses a Sentry compatible DSN and loads it
into the given scope.
>>> import raven
>>> dsn = 'https://public_key:[email protected]/project_id'
>>> # Apply configuration to local scope
>>> raven.load(dsn, locals())
>>> # Return DSN configuration
>>> options = raven.load(dsn)
"""
if not transport_registry:
from raven.transport import TransportRegistry, default_transports
transport_registry = TransportRegistry(default_transports)
url = urlparse(dsn)
if not transport_registry.supported_scheme(url.scheme):
raise ValueError('Unsupported Sentry DSN scheme: %r' % url.scheme)
if scope is None:
scope = {}
scope_extras = transport_registry.compute_scope(url, scope)
scope.update(scope_extras)
return scope
开发者ID:Aaron1011,项目名称:domorereps-android,代码行数:31,代码来源:__init__.py
示例4: send_remote
def send_remote(self, url, data, headers=None):
# If the client is configured to raise errors on sending,
# the implication is that the backoff and retry strategies
# will be handled by the calling application
if headers is None:
headers = {}
if not self.raise_send_errors and not self.state.should_try():
message = self._get_log_message(data)
self.error_logger.error(message)
return
self.logger.debug('Sending message of length %d to %s', len(data), url)
def failed_send(e):
self._failed_send(e, url, data)
try:
parsed = urlparse(url)
transport = self._registry.get_transport(
parsed, **self.transport_options)
if transport.async:
transport.async_send(data, headers, self._successful_send,
failed_send)
else:
transport.send(data, headers)
self._successful_send()
except Exception as e:
if self.raise_send_errors:
raise
failed_send(e)
开发者ID:asvetlov,项目名称:raven-python,代码行数:30,代码来源:base.py
示例5: get_transport
def get_transport(self, parsed_url, **options):
full_url = parsed_url.geturl()
if full_url not in self._transports:
# Remove the options from the parsed_url
parsed_url = urlparse.urlparse(full_url.split('?')[0])
self._transports[full_url] = self._schemes[parsed_url.scheme](parsed_url, **options)
return self._transports[full_url]
开发者ID:LudoBike,项目名称:quodlibet,代码行数:7,代码来源:registry.py
示例6: _get_public_dsn
def _get_public_dsn(self):
url = urlparse(self.servers[0])
netloc = url.hostname
if url.port:
netloc += ':%s' % url.port
path = url.path.replace('api/store/', self.project)
return '//%[email protected]%s%s' % (self.public_key, netloc, path)
开发者ID:Archaeopteryx,项目名称:elmo,代码行数:7,代码来源:base.py
示例7: send_remote
def send_remote(self, url, data, headers=None):
if headers is None:
headers = {}
if not self.state.should_try():
message = self._get_log_message(data)
self.error_logger.error(message)
return
self.logger.debug('Sending message of length %d to %s', len(data), url)
def failed_send(e):
self._failed_send(e, url, data)
try:
parsed = urlparse(url)
transport = self._registry.get_transport(
parsed, **self.transport_options)
if transport.async:
transport.async_send(data, headers, self._successful_send,
failed_send)
else:
transport.send(data, headers)
self._successful_send()
except Exception as e:
failed_send(e)
开发者ID:lnielsen,项目名称:raven-python,代码行数:25,代码来源:base.py
示例8: run_from_argv
def run_from_argv(self, argv):
if len(argv) <= 2 or argv[2] in ['-h', '--help']:
print self.usage(argv[1])
sys.exit(1)
subcommand_class = self._get_subcommand_class(argv[2])
parser = self.create_parser(argv[0], argv[2], subcommand_class)
if hasattr(self, 'use_argparse') and self.use_argparse:
subcommand_class.add_arguments(parser)
options = parser.parse_args(argv[3:])
cmd_options = vars(options)
args = cmd_options.pop('args', ())
else:
options, args = parser.parse_args(argv[3:])
handle_default_options(options)
try:
subcommand_class.execute(*args, **options.__dict__)
except Exception as e:
if not isinstance(e, CommandError):
if hasattr(settings, 'SENTRY_DSN'):
dsn = settings.SENTRY_DSN
elif hasattr(settings, 'RAVEN_CONFIG'):
dsn = settings.RAVEN_CONFIG.get('dsn')
else:
raise
sentry = Client(dsn)
# Force sync transport to avoid race condition with the process exiting
for url in sentry.servers:
parsed = urlparse.urlparse(url)
transport = sentry._registry.get_transport(parsed)
transport.async = False
sentry.get_ident(sentry.captureException())
self._write_error_in_stderr(e)
开发者ID:maykinmedia,项目名称:django-maven,代码行数:34,代码来源:maven.py
示例9: get_transport
def get_transport(self):
if not self.store_endpoint:
return
if not hasattr(self, '_transport'):
parsed = urlparse(self.store_endpoint)
self._transport = self._transport_cls(parsed, **self.options)
return self._transport
开发者ID:hannosch,项目名称:raven-python,代码行数:8,代码来源:remote.py
示例10: get_transport
def get_transport(self, parsed_url):
full_url = parsed_url.geturl()
if full_url not in self._transports:
# Grab options from the querystring to pass to the transport
# e.g. ?timeout=30
if parsed_url.query:
options = dict(q.split('=', 1) for q in parsed_url.query.split('&'))
else:
options = dict()
# Remove the options from the parsed_url
parsed_url = urlparse.urlparse(full_url.split('?')[0])
self._transports[full_url] = self._schemes[parsed_url.scheme](parsed_url, **options)
return self._transports[full_url]
开发者ID:5Ecur1ty,项目名称:raven-python,代码行数:13,代码来源:registry.py
示例11: test_shutdown_waits_for_send
def test_shutdown_waits_for_send(self):
url = urlparse(self.url)
transport = DummyThreadedScheme(url)
transport.send_delay = 0.5
data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(data, None, None, None)
time.sleep(0.1)
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
self.assertEqual(len(transport.events), 1)
开发者ID:CGenie,项目名称:raven-python,代码行数:14,代码来源:test_threaded_requests.py
示例12: from_string
def from_string(cls, value, transport=None, transport_registry=None):
# in Python 2.x sending the DSN as a unicode value will eventually
# cause issues in httplib
if PY2:
value = to_string(value)
url = urlparse(value.strip())
if url.scheme not in ("http", "https"):
warnings.warn(
"Transport selection via DSN is deprecated. You should explicitly pass the transport class to Client() instead."
)
if transport is None:
if not transport_registry:
from raven.transport import TransportRegistry, default_transports
transport_registry = TransportRegistry(default_transports)
if not transport_registry.supported_scheme(url.scheme):
raise InvalidDsn(ERR_UNKNOWN_SCHEME.format(url.scheme, value))
transport = transport_registry.get_transport_cls(url.scheme)
netloc = url.hostname
if url.port:
netloc += ":%s" % url.port
path_bits = url.path.rsplit("/", 1)
if len(path_bits) > 1:
path = path_bits[0]
else:
path = ""
project = path_bits[-1]
if not all([netloc, project, url.username, url.password]):
raise InvalidDsn("Invalid Sentry DSN: %r" % url.geturl())
base_url = "%s://%s%s" % (url.scheme.rsplit("+", 1)[-1], netloc, path)
return cls(
base_url=base_url,
project=project,
public_key=url.username,
secret_key=url.password,
options=dict(parse_qsl(url.query)),
transport=transport,
)
开发者ID:theatlantic,项目名称:raven-python,代码行数:48,代码来源:remote.py
示例13: send_remote
def send_remote(self, url, data, headers={}):
if not self.state.should_try():
message = self._get_log_message(data)
self.error_logger.error(message)
return
def failed_send(e):
self._failed_send(e, url, data)
try:
parsed = urlparse(url)
transport = self._registry.get_transport(parsed)
if transport.async:
transport.async_send(data, headers, self._successful_send,
failed_send)
else:
transport.send(data, headers)
self._successful_send()
except Exception as e:
failed_send(e)
开发者ID:hodgestar,项目名称:raven-python,代码行数:20,代码来源:base.py
示例14: test_fork_with_active_worker
def test_fork_with_active_worker(self):
# Test threaded transport when forking with an active worker.
# Forking a process doesn't clone the worker thread - make sure
# logging from both processes still works.
event1 = self.client.build_msg('raven.events.Message', message='parent')
event2 = self.client.build_msg('raven.events.Message', message='child')
url = urlparse(self.url)
fd, filename = mkstemp()
try:
os.close(fd)
transport = LoggingThreadedScheme(filename)
# Log from the parent process - starts the worker thread
transport.async_send(url, event1, None, None, None)
childpid = os.fork()
if childpid == 0:
# Log from the child process
transport.async_send(url, event2, None, None, None)
# Ensure threaded worker has finished
transport.get_worker().stop()
os._exit(0)
# Wait for the child process to finish
os.waitpid(childpid, 0)
assert os.path.isfile(filename)
# Ensure threaded worker has finished
transport.get_worker().stop()
with open(filename, 'r') as logfile:
events = dict(x.strip().split() for x in logfile.readlines())
# Check parent and child both logged successfully
assert events == {
str(os.getpid()): 'parent',
str(childpid): 'child',
}
finally:
os.remove(filename)
开发者ID:ehfeng,项目名称:raven-python,代码行数:41,代码来源:tests.py
示例15: from_string
def from_string(cls, value, transport=None, transport_registry=None):
url = urlparse(value)
if url.scheme not in ('http', 'https'):
warnings.warn('Transport selection via DSN is deprecated. You should explicitly pass the transport class to Client() instead.')
if transport is None:
if not transport_registry:
from raven.transport import TransportRegistry, default_transports
transport_registry = TransportRegistry(default_transports)
if not transport_registry.supported_scheme(url.scheme):
raise InvalidDsn(ERR_UNKNOWN_SCHEME.format(url.scheme))
transport = transport_registry.get_transport_cls(url.scheme)
netloc = url.hostname
if url.port:
netloc += ':%s' % url.port
path_bits = url.path.rsplit('/', 1)
if len(path_bits) > 1:
path = path_bits[0]
else:
path = ''
project = path_bits[-1]
if not all([netloc, project, url.username, url.password]):
raise InvalidDsn('Invalid Sentry DSN: %r' % url.geturl())
base_url = '%s://%s%s' % (url.scheme.rsplit('+', 1)[-1], netloc, path)
return cls(
base_url=base_url,
project=project,
public_key=url.username,
secret_key=url.password,
options=dict(parse_qsl(url.query)),
transport=transport,
)
开发者ID:hannosch,项目名称:raven-python,代码行数:40,代码来源:remote.py
示例16: test_fork_spawns_anew
def test_fork_spawns_anew(self):
url = urlparse(self.url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5
data = self.client.build_msg('raven.events.Message', message='foo')
pid = os.fork()
if pid == 0:
time.sleep(0.1)
transport.async_send(url, data, None, None, None)
# this should wait for the message to get sent
transport.get_worker().main_thread_terminated()
self.assertEqual(len(transport.events), 1)
# Use os._exit here so that py.test gets not confused about
# what the hell we're doing here.
os._exit(0)
else:
os.waitpid(pid, 0)
开发者ID:ehfeng,项目名称:raven-python,代码行数:22,代码来源:tests.py
示例17: set_dsn
def set_dsn(self, dsn=None, **options):
o = options
if dsn is None and os.environ.get('SENTRY_DSN'):
msg = "Configuring Raven from environment variable 'SENTRY_DSN'"
self.logger.debug(msg)
dsn = os.environ['SENTRY_DSN']
try:
servers, public_key, secret_key, project, transport_options = self.dsns[dsn]
except KeyError:
if dsn:
# TODO: should we validate other options weren't sent?
urlparts = urlparse(dsn)
self.logger.debug(
"Configuring Raven for host: %s://%s:%s" % (urlparts.scheme,
urlparts.netloc, urlparts.path))
dsn_config = raven.load(dsn, transport_registry=self._registry)
servers = dsn_config['SENTRY_SERVERS']
project = dsn_config['SENTRY_PROJECT']
public_key = dsn_config['SENTRY_PUBLIC_KEY']
secret_key = dsn_config['SENTRY_SECRET_KEY']
transport_options = dsn_config.get('SENTRY_TRANSPORT_OPTIONS', {})
else:
if o.get('servers'):
warnings.warn('Manually configured connections are deprecated. Switch to a DSN.', DeprecationWarning)
servers = o.get('servers')
project = o.get('project')
public_key = o.get('public_key')
secret_key = o.get('secret_key')
transport_options = {}
self.dsns[dsn] = servers, public_key, secret_key, project, transport_options
self.servers = servers
self.public_key = public_key
self.secret_key = secret_key
self.project = project or defaults.PROJECT
self.transport_options = transport_options
开发者ID:Danik1601,项目名称:Plex-Trakt-Scrobbler,代码行数:38,代码来源:base.py
示例18: __init__
def __init__(self, servers=None, include_paths=None, exclude_paths=None,
name=None, auto_log_stacks=None, key=None,
string_max_length=None, list_max_length=None, site=None,
public_key=None, secret_key=None, processors=None, project=None,
dsn=None, **kwargs):
# configure loggers first
cls = self.__class__
self.state = ClientState()
self.logger = logging.getLogger('%s.%s' % (cls.__module__,
cls.__name__))
self.error_logger = logging.getLogger('sentry.errors')
if isinstance(servers, basestring):
# must be a DSN:
if dsn:
# TODO: this should indicate what the caller can do to correct
# the constructor
msg = "You seem to be incorrectly instantiating the " + \
"raven Client class"
raise ValueError(msg)
dsn = servers
servers = None
if dsn is None and os.environ.get('SENTRY_DSN'):
msg = "Configuring Raven from environment variable 'SENTRY_DSN'"
self.logger.info(msg)
dsn = os.environ['SENTRY_DSN']
if dsn:
# TODO: should we validate other options werent sent?
urlparts = urlparse(dsn)
msg = "Configuring Raven for host: %s://%s:%s" % (
urlparts.scheme, urlparts.netloc, urlparts.path)
self.logger.info(msg)
options = raven.load(dsn, transport_registry=self._registry)
servers = options['SENTRY_SERVERS']
project = options['SENTRY_PROJECT']
public_key = options['SENTRY_PUBLIC_KEY']
secret_key = options['SENTRY_SECRET_KEY']
# servers may be set to a NoneType (for Django)
if servers and not (key or (secret_key and public_key)):
self.logger.info('Raven is not configured (disabled). Please see documentation for more information.')
if kwargs.get('timeout') is not None:
warnings.warn('The ``timeout`` option no longer does anything. Pass the option to your transport instead.')
self.servers = servers
self.include_paths = set(include_paths or defaults.INCLUDE_PATHS)
self.exclude_paths = set(exclude_paths or defaults.EXCLUDE_PATHS)
self.name = unicode(name or defaults.NAME)
self.auto_log_stacks = bool(auto_log_stacks or
defaults.AUTO_LOG_STACKS)
self.key = str(key or defaults.KEY)
self.string_max_length = int(string_max_length or
defaults.MAX_LENGTH_STRING)
self.list_max_length = int(list_max_length or defaults.MAX_LENGTH_LIST)
if (site or defaults.SITE):
self.site = unicode(site or defaults.SITE)
else:
self.site = None
self.public_key = public_key
self.secret_key = secret_key
self.project = project or defaults.PROJECT
self.processors = processors or defaults.PROCESSORS
self.module_cache = ModuleProxyCache()
开发者ID:akbargumbira,项目名称:inasafe,代码行数:67,代码来源:base.py
示例19: _send_remote
def _send_remote(self, url, data, headers={}):
parsed = urlparse(url)
transport = self._registry.get_transport(parsed)
return transport.send(data, headers)
开发者ID:Archaeopteryx,项目名称:elmo,代码行数:5,代码来源:base.py
示例20: setup_handlers
def setup_handlers():
if 'sentry_handler' not in __opts__:
log.debug('No \'sentry_handler\' key was found in the configuration')
return False
options = {}
dsn = get_config_value('dsn')
if dsn is not None:
try:
# support raven ver 5.5.0
from raven.transport import TransportRegistry, default_transports
from raven.utils.urlparse import urlparse
transport_registry = TransportRegistry(default_transports)
url = urlparse(dsn)
if not transport_registry.supported_scheme(url.scheme):
raise ValueError('Unsupported Sentry DSN scheme: {0}'.format(url.scheme))
dsn_config = {}
conf_extras = transport_registry.compute_scope(url, dsn_config)
dsn_config.update(conf_extras)
options.update({
'project': dsn_config['SENTRY_PROJECT'],
'servers': dsn_config['SENTRY_SERVERS'],
'public_key': dsn_config['SENTRY_PUBLIC_KEY'],
'secret_key': dsn_config['SENTRY_SECRET_KEY']
})
except ValueError as exc:
log.info(
'Raven failed to parse the configuration provided '
'DSN: {0}'.format(exc)
)
# Allow options to be overridden if previously parsed, or define them
for key in ('project', 'servers', 'public_key', 'secret_key'):
config_value = get_config_value(key)
if config_value is None and key not in options:
log.debug(
'The required \'sentry_handler\' configuration key, '
'\'{0}\', is not properly configured. Not configuring '
'the sentry logging handler.'.format(key)
)
return
elif config_value is None:
continue
options[key] = config_value
# site: An optional, arbitrary string to identify this client installation.
options.update({
# site: An optional, arbitrary string to identify this client
# installation
'site': get_config_value('site'),
# name: This will override the server_name value for this installation.
# Defaults to socket.gethostname()
'name': get_config_value('name'),
# exclude_paths: Extending this allow you to ignore module prefixes
# when sentry attempts to discover which function an error comes from
'exclude_paths': get_config_value('exclude_paths', ()),
# include_paths: For example, in Django this defaults to your list of
# INSTALLED_APPS, and is used for drilling down where an exception is
# located
'include_paths': get_config_value('include_paths', ()),
# list_max_length: The maximum number of items a list-like container
# should store.
'list_max_length': get_config_value('list_max_length'),
# string_max_length: The maximum characters of a string that should be
# stored.
'string_max_length': get_config_value('string_max_length'),
# auto_log_stacks: Should Raven automatically log frame stacks
# (including locals) all calls as it would for exceptions.
'auto_log_stacks': get_config_value('auto_log_stacks'),
# timeout: If supported, the timeout value for sending messages to
# remote.
'timeout': get_config_value('timeout', 1),
# processors: A list of processors to apply to events before sending
# them to the Sentry server. Useful for sending additional global state
# data or sanitizing data that you want to keep off of the server.
'processors': get_config_value('processors'),
# dsn: Ensure the DSN is passed into the client
'dsn': dsn
})
client = raven.Client(**options)
context = get_config_value('context')
context_dict = {}
if context is not None:
for tag in context:
tag_value = __salt__['grains.get'](tag)
if len(tag_value) > 0:
context_dict[tag] = tag_value
if len(context_dict) > 0:
client.context.merge({'tags': context_dict})
try:
handler = SentryHandler(client)
#.........这里部分代码省略.........
开发者ID:HowardMei,项目名称:saltstack,代码行数:101,代码来源:sentry_mod.py
注:本文中的raven.utils.urlparse.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论