本文整理汇总了Python中sentry.utils.hashlib.md5函数的典型用法代码示例。如果您正苦于以下问题:Python md5函数的具体用法?Python md5怎么用?Python md5使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了md5函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: fetch_release_file
def fetch_release_file(filename, release):
cache_key = "releasefile:%s:%s" % (release.id, md5(filename).hexdigest())
logger.debug("Checking cache for release artifact %r (release_id=%s)", filename, release.id)
result = cache.get(cache_key)
if result is None:
logger.debug("Checking database for release artifact %r (release_id=%s)", filename, release.id)
ident = ReleaseFile.get_ident(filename)
try:
releasefile = (
ReleaseFile.objects.filter(release=release, ident=ident).select_related("file", "file__blob").get()
)
except ReleaseFile.DoesNotExist:
logger.debug("Release artifact %r not found in database (release_id=%s)", filename, release.id)
cache.set(cache_key, -1, 60)
return None
logger.debug("Found release artifact %r (id=%s, release_id=%s)", filename, releasefile.id, release.id)
try:
with releasefile.file.getfile() as fp:
body = fp.read()
except Exception as e:
logger.exception(unicode(e))
result = -1
else:
result = (releasefile.file.headers, body, 200)
cache.set(cache_key, result, 3600)
if result == -1:
result = None
return result
开发者ID:simudream,项目名称:sentry,代码行数:31,代码来源:processor.py
示例2: fetch_release_file
def fetch_release_file(filename, release):
cache_key = 'releasefile:%s:%s' % (
release.id,
md5(filename).hexdigest(),
)
logger.debug('Checking cache for release artfiact %r (release_id=%s)',
filename, release.id)
result = cache.get(cache_key)
if result is None:
logger.debug('Checking database for release artifact %r (release_id=%s)',
filename, release.id)
ident = ReleaseFile.get_ident(filename)
try:
releasefile = ReleaseFile.objects.filter(
release=release,
ident=ident,
).select_related('file').get()
except ReleaseFile.DoesNotExist:
logger.debug('Release artifact %r not found in database (release_id=%s)',
filename, release.id)
cache.set(cache_key, -1, 60)
return None
logger.debug('Found release artifact %r (id=%s, release_id=%s)',
filename, releasefile.id, release.id)
with releasefile.file.getfile() as fp:
body = fp.read()
result = (releasefile.file.headers, body, 200)
cache.set(cache_key, result, 300)
elif result == -1:
result = None
return result
开发者ID:noah-lee,项目名称:sentry,代码行数:33,代码来源:processor.py
示例3: _make_key
def _make_key(self, model, filters):
"""
Returns a Redis-compatible key for the model given filters.
"""
return 'b:k:%s:%s' % (
model._meta,
md5('&'.join('%s=%s' % (k, self._coerce_val(v))
for k, v in sorted(filters.iteritems()))).hexdigest(),
)
开发者ID:daevaorn,项目名称:sentry,代码行数:9,代码来源:redis.py
示例4: get_gravatar_url
def get_gravatar_url(email, size=None, default='mm'):
gravatar_url = "%s/avatar/%s" % (settings.SENTRY_GRAVATAR_BASE_URL,
md5(email.lower()).hexdigest())
properties = {}
if size:
properties['s'] = str(size)
if default:
properties['d'] = default
if properties:
gravatar_url += "?" + urllib.urlencode(properties)
return gravatar_url
开发者ID:Andy-hpliu,项目名称:sentry,代码行数:13,代码来源:avatar.py
示例5: fetch_release_file
def fetch_release_file(filename, release):
cache_key = 'releasefile:v1:%s:%s' % (
release.id,
md5(filename).hexdigest(),
)
logger.debug('Checking cache for release artifact %r (release_id=%s)',
filename, release.id)
result = cache.get(cache_key)
if result is None:
logger.debug('Checking database for release artifact %r (release_id=%s)',
filename, release.id)
ident = ReleaseFile.get_ident(filename)
try:
releasefile = ReleaseFile.objects.filter(
release=release,
ident=ident,
).select_related('file').get()
except ReleaseFile.DoesNotExist:
logger.debug('Release artifact %r not found in database (release_id=%s)',
filename, release.id)
cache.set(cache_key, -1, 60)
return None
logger.debug('Found release artifact %r (id=%s, release_id=%s)',
filename, releasefile.id, release.id)
try:
with releasefile.file.getfile() as fp:
z_body, body = compress_file(fp)
except Exception as e:
logger.exception(unicode(e))
cache.set(cache_key, -1, 3600)
result = None
else:
# Write the compressed version to cache, but return the deflated version
cache.set(cache_key, (releasefile.file.headers, z_body, 200), 3600)
result = (releasefile.file.headers, body, 200)
elif result == -1:
# We cached an error, so normalize
# it down to None
result = None
else:
# We got a cache hit, but the body is compressed, so we
# need to decompress it before handing it off
body = zlib.decompress(result[1])
result = (result[0], body, result[2])
return result
开发者ID:JJediny,项目名称:sentry,代码行数:47,代码来源:processor.py
示例6: fetch_file
def fetch_file(url, project=None, release=None, allow_scraping=True):
"""
Pull down a URL, returning a UrlResult object.
Attempts to fetch from the cache.
"""
if release:
result = fetch_release_file(url, release)
elif not allow_scraping or not url.startswith(('http:', 'https:')):
error = {
'type': EventError.JS_MISSING_SOURCE,
'url': url,
}
raise CannotFetchSource(error)
else:
result = None
cache_key = 'source:cache:v2:%s' % (
md5(url).hexdigest(),
)
if result is None:
logger.debug('Checking cache for url %r', url)
result = cache.get(cache_key)
if result is None:
# lock down domains that are problematic
domain = urlparse(url).netloc
domain_key = 'source:blacklist:v2:%s' % (
md5(domain).hexdigest(),
)
domain_result = cache.get(domain_key)
if domain_result:
domain_result['url'] = url
raise CannotFetchSource(domain_result)
headers = {}
if project and is_valid_origin(url, project=project):
token = project.get_option('sentry:token')
if token:
headers['X-Sentry-Token'] = token
logger.debug('Fetching %r from the internet', url)
http_session = http.build_session()
try:
response = http_session.get(
url,
allow_redirects=True,
verify=False,
headers=headers,
timeout=settings.SENTRY_SOURCE_FETCH_TIMEOUT,
)
except Exception as exc:
logger.debug('Unable to fetch %r', url, exc_info=True)
if isinstance(exc, SuspiciousOperation):
error = {
'type': EventError.SECURITY_VIOLATION,
'value': unicode(exc),
'url': url,
}
elif isinstance(exc, (RequestException, ZeroReturnError)):
error = {
'type': EventError.JS_GENERIC_FETCH_ERROR,
'value': str(type(exc)),
'url': url,
}
else:
logger.exception(unicode(exc))
error = {
'type': EventError.UNKNOWN_ERROR,
'url': url,
}
# TODO(dcramer): we want to be less aggressive on disabling domains
cache.set(domain_key, error or '', 300)
logger.warning('Disabling sources to %s for %ss', domain, 300,
exc_info=True)
raise CannotFetchSource(error)
# requests' attempts to use chardet internally when no encoding is found
# and we want to avoid that slow behavior
if not response.encoding:
response.encoding = 'utf-8'
result = (
{k.lower(): v for k, v in response.headers.items()},
response.text,
response.status_code,
)
cache.set(cache_key, result, 60)
if result[2] != 200:
logger.debug('HTTP %s when fetching %r', result[2], url,
exc_info=True)
error = {
'type': EventError.JS_INVALID_HTTP_CODE,
'value': result[2],
'url': url,
}
#.........这里部分代码省略.........
开发者ID:haojiang1,项目名称:sentry,代码行数:101,代码来源:processor.py
示例7: fetch_file
def fetch_file(url, project=None, release=None, allow_scraping=True):
"""
Pull down a URL, returning a UrlResult object.
Attempts to fetch from the cache.
"""
if release:
result = fetch_release_file(url, release)
else:
result = None
cache_key = 'source:cache:v3:%s' % (
md5(url).hexdigest(),
)
if result is None:
if not allow_scraping or not url.startswith(('http:', 'https:')):
error = {
'type': EventError.JS_MISSING_SOURCE,
'url': url,
}
raise CannotFetchSource(error)
logger.debug('Checking cache for url %r', url)
result = cache.get(cache_key)
if result is not None:
# We got a cache hit, but the body is compressed, so we
# need to decompress it before handing it off
body = zlib.decompress(result[1])
result = (result[0], force_text(body), result[2])
if result is None:
# lock down domains that are problematic
domain = urlparse(url).netloc
domain_key = 'source:blacklist:v2:%s' % (
md5(domain).hexdigest(),
)
domain_result = cache.get(domain_key)
if domain_result:
domain_result['url'] = url
raise CannotFetchSource(domain_result)
headers = {}
if project and is_valid_origin(url, project=project):
token = project.get_option('sentry:token')
if token:
headers['X-Sentry-Token'] = token
logger.debug('Fetching %r from the internet', url)
http_session = http.build_session()
try:
response = http_session.get(
url,
allow_redirects=True,
verify=False,
headers=headers,
timeout=settings.SENTRY_SOURCE_FETCH_TIMEOUT,
)
except Exception as exc:
logger.debug('Unable to fetch %r', url, exc_info=True)
if isinstance(exc, RestrictedIPAddress):
error = {
'type': EventError.RESTRICTED_IP,
'url': url,
}
elif isinstance(exc, SuspiciousOperation):
error = {
'type': EventError.SECURITY_VIOLATION,
'url': url,
}
elif isinstance(exc, (RequestException, ZeroReturnError)):
error = {
'type': EventError.JS_GENERIC_FETCH_ERROR,
'value': str(type(exc)),
'url': url,
}
else:
logger.exception(unicode(exc))
error = {
'type': EventError.UNKNOWN_ERROR,
'url': url,
}
# TODO(dcramer): we want to be less aggressive on disabling domains
cache.set(domain_key, error or '', 300)
logger.warning('Disabling sources to %s for %ss', domain, 300,
exc_info=True)
raise CannotFetchSource(error)
# requests' attempts to use chardet internally when no encoding is found
# and we want to avoid that slow behavior
if not response.encoding:
response.encoding = 'utf-8'
body = response.text
z_body = zlib.compress(force_bytes(body))
headers = {k.lower(): v for k, v in response.headers.items()}
cache.set(cache_key, (headers, z_body, response.status_code), 60)
#.........这里部分代码省略.........
开发者ID:JJediny,项目名称:sentry,代码行数:101,代码来源:processor.py
示例8: get_hash
def get_hash(self):
value = self.ident or self.username or self.email or self.ip_address
return md5(value).hexdigest()
开发者ID:GeekGalaxy,项目名称:sentry,代码行数:3,代码来源:eventuser.py
示例9: _make_cache_key
def _make_cache_key(self, key):
return 'o:{0}'.format(md5(key).hexdigest())
开发者ID:noah-lee,项目名称:sentry,代码行数:2,代码来源:manager.py
示例10: _make_cache_key
def _make_cache_key(key):
return 'o:%s' % md5(key).hexdigest()
开发者ID:daevaorn,项目名称:sentry,代码行数:2,代码来源:store.py
示例11: fetch_release_file
def fetch_release_file(filename, release):
cache_key = 'releasefile:v1:%s:%s' % (
release.id,
md5(filename).hexdigest(),
),
filename_path = None
if filename is not None:
# Reconstruct url without protocol + host
# e.g. http://example.com/foo?bar => ~/foo?bar
parsed_url = urlparse(filename)
filename_path = '~' + parsed_url.path
if parsed_url.query:
filename_path += '?' + parsed_url.query
logger.debug('Checking cache for release artifact %r (release_id=%s)',
filename, release.id)
result = cache.get(cache_key)
if result is None:
logger.debug('Checking database for release artifact %r (release_id=%s)',
filename, release.id)
filename_idents = [ReleaseFile.get_ident(filename)]
if filename_path is not None and filename_path != filename:
filename_idents.append(ReleaseFile.get_ident(filename_path))
possible_files = list(ReleaseFile.objects.filter(
release=release,
ident__in=filename_idents,
).select_related('file'))
if len(possible_files) == 0:
logger.debug('Release artifact %r not found in database (release_id=%s)',
filename, release.id)
cache.set(cache_key, -1, 60)
return None
elif len(possible_files) == 1:
releasefile = possible_files[0]
else:
# Prioritize releasefile that matches full url (w/ host)
# over hostless releasefile
target_ident = filename_idents[0]
releasefile = next((f for f in possible_files if f.ident == target_ident))
logger.debug('Found release artifact %r (id=%s, release_id=%s)',
filename, releasefile.id, release.id)
try:
with releasefile.file.getfile() as fp:
z_body, body = compress_file(fp)
except Exception as e:
logger.exception(unicode(e))
cache.set(cache_key, -1, 3600)
result = None
else:
# Write the compressed version to cache, but return the deflated version
cache.set(cache_key, (releasefile.file.headers, z_body, 200), 3600)
result = (releasefile.file.headers, body, 200)
elif result == -1:
# We cached an error, so normalize
# it down to None
result = None
else:
# We got a cache hit, but the body is compressed, so we
# need to decompress it before handing it off
body = zlib.decompress(result[1])
result = (result[0], body, result[2])
return result
开发者ID:dawnoble,项目名称:sentry,代码行数:69,代码来源:processor.py
示例12: test_simple
def test_simple(self):
md5('x').hexdigest() == '9dd4e461268c8034f5c8564e155c67a6'
sha1('x').hexdigest() == '11f6ad8ec52a2984abaafd7c3b516503785c2072'
开发者ID:280185386,项目名称:sentry,代码行数:3,代码来源:tests.py
示例13: test_unicode
def test_unicode(self):
md5(u'ü').hexdigest() == 'c03410a5204b21cd8229ff754688d743'
sha1(u'ü').hexdigest() == '94a759fd37735430753c7b6b80684306d80ea16e'
开发者ID:280185386,项目名称:sentry,代码行数:3,代码来源:tests.py
示例14: get_cache_key
def get_cache_key(cls, project_id, version):
return 'release:2:%s:%s' % (project_id, md5(version).hexdigest())
开发者ID:280185386,项目名称:sentry,代码行数:2,代码来源:release.py
示例15: get_cache_key
def get_cache_key(cls, project_id, name):
return "env:1:%s:%s" % (project_id, md5(name).hexdigest())
开发者ID:mitsuhiko,项目名称:sentry,代码行数:2,代码来源:environment.py
示例16: fetch_file
def fetch_file(url, project=None, release=None, allow_scraping=True):
"""
Pull down a URL, returning a UrlResult object.
Attempts to fetch from the cache.
"""
if release:
result = fetch_release_file(url, release)
elif not allow_scraping or not url.startswith(("http:", "https:")):
error = {"type": EventError.JS_MISSING_SOURCE, "url": url}
raise CannotFetchSource(error)
else:
result = None
cache_key = "source:cache:v3:%s" % (md5(url).hexdigest(),)
if result is None:
logger.debug("Checking cache for url %r", url)
result = cache.get(cache_key)
if result is not None:
# We got a cache hit, but the body is compressed, so we
# need to decompress it before handing it off
body = zlib.decompress(result[1])
result = (result[0], force_text(body), result[2])
if result is None:
# lock down domains that are problematic
domain = urlparse(url).netloc
domain_key = "source:blacklist:v2:%s" % (md5(domain).hexdigest(),)
domain_result = cache.get(domain_key)
if domain_result:
domain_result["url"] = url
raise CannotFetchSource(domain_result)
headers = {}
if project and is_valid_origin(url, project=project):
token = project.get_option("sentry:token")
if token:
headers["X-Sentry-Token"] = token
logger.debug("Fetching %r from the internet", url)
http_session = http.build_session()
try:
response = http_session.get(
url, allow_redirects=True, verify=False, headers=headers, timeout=settings.SENTRY_SOURCE_FETCH_TIMEOUT
)
except Exception as exc:
logger.debug("Unable to fetch %r", url, exc_info=True)
if isinstance(exc, RestrictedIPAddress):
error = {"type": EventError.RESTRICTED_IP, "url": url}
elif isinstance(exc, SuspiciousOperation):
error = {"type": EventError.SECURITY_VIOLATION, "url": url}
elif isinstance(exc, (RequestException, ZeroReturnError)):
error = {"type": EventError.JS_GENERIC_FETCH_ERROR, "value": str(type(exc)), "url": url}
else:
logger.exception(unicode(exc))
error = {"type": EventError.UNKNOWN_ERROR, "url": url}
# TODO(dcramer): we want to be less aggressive on disabling domains
cache.set(domain_key, error or "", 300)
logger.warning("Disabling sources to %s for %ss", domain, 300, exc_info=True)
raise CannotFetchSource(error)
# requests' attempts to use chardet internally when no encoding is found
# and we want to avoid that slow behavior
if not response.encoding:
response.encoding = "utf-8"
body = response.text
z_body = zlib.compress(force_bytes(body))
headers = {k.lower(): v for k, v in response.headers.items()}
cache.set(cache_key, (headers, z_body, response.status_code), 60)
result = (headers, body, response.status_code)
if result[2] != 200:
logger.debug("HTTP %s when fetching %r", result[2], url, exc_info=True)
error = {"type": EventError.JS_INVALID_HTTP_CODE, "value": result[2], "url": url}
raise CannotFetchSource(error)
return UrlResult(url, result[0], result[1])
开发者ID:jasonbeverage,项目名称:sentry,代码行数:82,代码来源:processor.py
注:本文中的sentry.utils.hashlib.md5函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论