本文整理汇总了Python中sentry.http.safe_urlread函数的典型用法代码示例。如果您正苦于以下问题:Python safe_urlread函数的具体用法?Python safe_urlread怎么用?Python safe_urlread使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了safe_urlread函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _send_webhook
def _send_webhook(self):
safe_urlread(
safe_urlopen(
url=self.sentry_app.webhook_url,
data=self.request.body,
headers=self.request.headers,
timeout=5,
)
)
开发者ID:Kayle009,项目名称:sentry,代码行数:9,代码来源:installation_notifier.py
示例2: _make_request
def _make_request(self):
try:
body = safe_urlread(
safe_urlopen(
url=self._build_url(),
headers=self._build_headers(),
)
)
response = json.loads(body)
except Exception as e:
logger.info(
'select-requester.error',
extra={
'sentry_app': self.sentry_app.slug,
'install': self.install.uuid,
'project': self.project and self.project.slug,
'uri': self.uri,
'error_message': e.message,
}
)
response = {}
if not self._validate_response(response):
raise APIError()
return self._format_response(response)
开发者ID:getsentry,项目名称:sentry,代码行数:27,代码来源:select_requester.py
示例3: create_issue
def create_issue(self, request, group, form_data, **kwargs):
json_data = {
'story_type': 'bug',
'name': force_text(form_data['title'], encoding='utf-8', errors='replace'),
'description': force_text(form_data['description'], encoding='utf-8', errors='replace'),
'labels': ['sentry'],
}
try:
_url = self.build_api_url(group, 'stories')
req = self.make_api_request(group.project, _url, json_data=json_data)
body = safe_urlread(req)
except requests.RequestException as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = six.text_type(e)
raise PluginError('Error communicating with Pivotal: %s' % (msg, ))
if req.status_code > 399:
raise PluginError(json_resp['error'])
return json_resp['id']
开发者ID:getsentry,项目名称:sentry-plugins,代码行数:26,代码来源:plugin.py
示例4: dispatch
def dispatch(self, request, helper):
access_token = helper.fetch_state('data')['access_token']
req = safe_urlopen('{0}?{1}&alt=json'.format(
USER_DETAILS_ENDPOINT,
urlencode({
'access_token': access_token,
})
))
body = safe_urlread(req)
data = json.loads(body)
if not data.get('data'):
logger.error('Invalid response: %s' % body)
return helper.error(ERR_INVALID_RESPONSE)
if not data.get('data').get('email'):
logger.error('Invalid response: %s' % body)
return helper.error(ERR_INVALID_RESPONSE)
domain = extract_domain(data.get('data').get('email'))
if domain in DOMAIN_BLOCKLIST:
return helper.error(ERR_INVALID_DOMAIN % (domain,))
if self.domain and self.domain != domain:
return helper.error(ERR_INVALID_DOMAIN % (domain,))
helper.bind_state('domain', domain)
helper.bind_state('user', data.get('data'))
return helper.next_step()
开发者ID:dieswaytoofast,项目名称:sentry-auth-google,代码行数:32,代码来源:views.py
示例5: get_issue_title_by_id
def get_issue_title_by_id(self, request, group, issue_id):
_url = '%s/%s' % (self.build_api_url(group, 'stories'), issue_id)
req = self.make_api_request(group.project, _url)
body = safe_urlread(req)
json_resp = json.loads(body)
return json_resp['name']
开发者ID:getsentry,项目名称:sentry-plugins,代码行数:7,代码来源:plugin.py
示例6: refresh_identity
def refresh_identity(self, auth_identity):
refresh_token = auth_identity.data.get("refresh_token")
if not refresh_token:
raise IdentityNotValid("Missing refresh token")
data = self.get_refresh_token_params(refresh_token=refresh_token)
req = safe_urlopen(self.get_refresh_token_url(), data=data)
try:
body = safe_urlread(req)
payload = json.loads(body)
except Exception:
payload = {}
error = payload.get("error", "unknown_error")
error_description = payload.get("error_description", "no description available")
formatted_error = "HTTP {} ({}): {}".format(req.status_code, error, error_description)
if req.status_code == 401:
raise IdentityNotValid(formatted_error)
if req.status_code == 400:
# this may not be common, but at the very least Google will return
# an invalid grant when a user is suspended
if error == "invalid_grant":
raise IdentityNotValid(formatted_error)
if req.status_code != 200:
raise Exception(formatted_error)
auth_identity.data.update(self.get_oauth_data(payload))
auth_identity.update(data=auth_identity.data)
开发者ID:AyrtonRicardo,项目名称:sentry,代码行数:34,代码来源:oauth2.py
示例7: exchange_token
def exchange_token(self, request, helper, code):
# TODO: this needs the auth yet
data = self.get_token_params(code=code, redirect_uri=absolute_uri(helper.get_redirect_url()))
req = safe_urlopen(self.access_token_url, data=data)
body = safe_urlread(req)
return json.loads(body)
开发者ID:Juraldinio,项目名称:sentry,代码行数:7,代码来源:oauth2.py
示例8: view
def view(self, request, group, **kwargs):
if request.GET.get('autocomplete_query'):
query = request.GET.get('q')
if not query:
return JSONResponse({'issues': []})
repo = self.get_option('repo', group.project)
query = 'repo:%s %s' % (repo, query)
url = 'https://api.github.com/search/issues?%s' % (urlencode({'q': query}),)
try:
req = self.make_api_request(request.user, url)
body = safe_urlread(req)
except requests.RequestException as e:
msg = unicode(e)
self.handle_api_error(request, msg)
return JSONResponse({}, status=502)
try:
json_resp = json.loads(body)
except ValueError as e:
msg = unicode(e)
self.handle_api_error(request, msg)
return JSONResponse({}, status=502)
issues = [{
'text': '(#%s) %s' % (i['number'], i['title']),
'id': i['number']
} for i in json_resp.get('items', [])]
return JSONResponse({'issues': issues})
return super(GitHubPlugin, self).view(request, group, **kwargs)
开发者ID:getsentry,项目名称:sentry-github,代码行数:31,代码来源:plugin.py
示例9: exchange_token
def exchange_token(self, request, pipeline, code):
# TODO: this needs the auth yet
data = self.get_token_params(
code=code,
redirect_uri=absolute_uri(pipeline.redirect_url()),
)
verify_ssl = pipeline.config.get('verify_ssl', True)
try:
req = safe_urlopen(self.access_token_url, data=data, verify_ssl=verify_ssl)
body = safe_urlread(req)
if req.headers.get('Content-Type', '').startswith('application/x-www-form-urlencoded'):
return dict(parse_qsl(body))
return json.loads(body)
except SSLError:
logger.info('identity.oauth2.ssl-error', extra={
'url': self.access_token_url,
'verify_ssl': verify_ssl,
})
url = self.access_token_url
return {
'error': 'Could not verify SSL certificate',
'error_description': u'Ensure that {} has a valid SSL certificate'.format(url)
}
except JSONDecodeError:
logger.info('identity.oauth2.json-error', extra={
'url': self.access_token_url,
})
return {
'error': 'Could not decode a JSON Response',
'error_description': u'We were not able to parse a JSON response, please try again.'
}
开发者ID:Kayle009,项目名称:sentry,代码行数:31,代码来源:oauth2.py
示例10: _make_request
def _make_request(self):
req = safe_urlopen(
url=self._build_url(),
headers=self._build_headers(),
method='POST',
data=self.body,
)
try:
body = safe_urlread(req)
response = json.loads(body)
except Exception:
logger.info(
'issue-link-requester.error',
extra={
'sentry_app': self.sentry_app.slug,
'install': self.install.uuid,
'project': self.group.project.slug,
'group': self.group.id,
'uri': self.uri,
}
)
response = {}
if not self._validate_response(response):
raise APIError()
return response
开发者ID:getsentry,项目名称:sentry,代码行数:28,代码来源:issue_link_requester.py
示例11: github_request
def github_request(self, request, url, **kwargs):
"""
Make a GitHub request on behalf of the logged in user. Return JSON
response on success or raise forms.ValidationError on any exception
"""
auth = self.get_auth_for_user(user=request.user)
if auth is None:
raise forms.ValidationError(_("You have not yet associated GitHub with your account."))
headers = kwargs.pop("headers", None) or {}
headers["Authorization"] = "token %s" % auth.tokens["access_token"]
try:
req = safe_urlopen(url, headers=headers, **kwargs)
body = safe_urlread(req)
except requests.RequestException as e:
msg = unicode(e)
raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = unicode(e)
raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))
if req.status_code > 399:
raise forms.ValidationError(json_resp["message"])
return json_resp
开发者ID:imankulov,项目名称:sentry-github,代码行数:28,代码来源:plugin.py
示例12: get_issue_title_by_id
def get_issue_title_by_id(self, request, group, issue_id):
url = '%s/%s' % (self.build_api_url(group, 'issues'), issue_id)
req = self.make_api_request(request.user, url)
body = safe_urlread(req)
json_resp = json.loads(body)
return json_resp['title']
开发者ID:getsentry,项目名称:sentry-github,代码行数:7,代码来源:plugin.py
示例13: create_issue
def create_issue(self, request, group, form_data, **kwargs):
# TODO: support multiple identities via a selection input in the form?
json_data = {
"title": form_data['title'],
"body": form_data['description'],
"assignee": form_data.get('assignee'),
}
try:
url = self.build_api_url(group, 'issues')
req = self.make_api_request(request.user, url, json_data=json_data)
body = safe_urlread(req)
except requests.RequestException as e:
msg = unicode(e)
raise forms.ValidationError(_('Error communicating with GitHub: %s') % (msg,))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = unicode(e)
raise forms.ValidationError(_('Error communicating with GitHub: %s') % (msg,))
if req.status_code > 399:
raise forms.ValidationError(json_resp['message'])
return json_resp['number']
开发者ID:getsentry,项目名称:sentry-github,代码行数:26,代码来源:plugin.py
示例14: exchange_token
def exchange_token(self, request, helper, code):
# TODO: this needs the auth yet
data = self.get_token_params(code=code, redirect_uri=absolute_uri(helper.get_redirect_url()))
req = safe_urlopen(self.access_token_url, data=data)
body = safe_urlread(req)
if req.headers["Content-Type"].startswith("application/x-www-form-urlencoded"):
return dict(parse_qsl(body))
return json.loads(body)
开发者ID:AyrtonRicardo,项目名称:sentry,代码行数:8,代码来源:oauth2.py
示例15: test_simple
def test_simple(self):
responses.add(responses.GET, 'http://example.com', body='foo bar')
resp = safe_urlopen('http://example.com')
data = safe_urlread(resp)
assert data == 'foo bar'
request = responses.calls[0].request
assert 'User-Agent' in request.headers
assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:280185386,项目名称:sentry,代码行数:10,代码来源:test_http.py
示例16: fetch_url
def fetch_url(url, project=None):
"""
Pull down a URL, returning a UrlResult object.
Attempts to fetch from the cache.
"""
cache_key = 'source:%s' % (
hashlib.md5(url.encode('utf-8')).hexdigest(),)
result = cache.get(cache_key)
if result is None:
# lock down domains that are problematic
domain = urlparse(url).netloc
domain_key = 'source:%s' % (hashlib.md5(domain.encode('utf-8')).hexdigest(),)
domain_result = cache.get(domain_key)
if domain_result:
return BAD_SOURCE
headers = []
if project and is_valid_origin(url, project=project):
token = project.get_option('sentry:token')
if token:
headers.append(('X-Sentry-Token', token))
try:
request = safe_urlopen(
url,
allow_redirects=True,
headers=headers,
timeout=settings.SENTRY_SOURCE_FETCH_TIMEOUT,
)
except HTTPError:
result = BAD_SOURCE
except Exception:
# it's likely we've failed due to a timeout, dns, etc so let's
# ensure we can't cascade the failure by pinning this for 5 minutes
cache.set(domain_key, 1, 300)
logger.warning('Disabling sources to %s for %ss', domain, 300,
exc_info=True)
return BAD_SOURCE
else:
try:
body = safe_urlread(request)
except Exception:
result = BAD_SOURCE
else:
result = (dict(request.headers), body)
cache.set(cache_key, result, 60)
if result == BAD_SOURCE:
return result
return UrlResult(url, *result)
开发者ID:uber,项目名称:sentry,代码行数:54,代码来源:fetch_source.py
示例17: test_simple
def test_simple(self, mock_getaddrinfo):
mock_getaddrinfo.return_value = [(2, 1, 6, '', ('81.0.0.1', 0))]
responses.add(responses.GET, 'http://example.com', body='foo bar')
resp = http.safe_urlopen('http://example.com')
data = http.safe_urlread(resp)
assert data.decode('utf-8') == 'foo bar'
request = responses.calls[0].request
assert 'User-Agent' in request.headers
assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:Kayle009,项目名称:sentry,代码行数:11,代码来源:test_http.py
示例18: test_simple
def test_simple(self, mock_gethostbyname):
mock_gethostbyname.return_value = '81.0.0.1'
responses.add(responses.GET, 'http://example.com', body='foo bar')
resp = http.safe_urlopen('http://example.com')
data = http.safe_urlread(resp)
assert data == 'foo bar'
request = responses.calls[0].request
assert 'User-Agent' in request.headers
assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:BrunoAsato,项目名称:sentry,代码行数:11,代码来源:test_http.py
示例19: send_beacon
def send_beacon():
"""
Send a Beacon to a remote server operated by the Sentry team.
See the documentation for more details.
"""
from sentry import options
from sentry.models import Organization, Project, Team, User
if not settings.SENTRY_BEACON:
logger.info('Not sending beacon (disabled)')
return
install_id = options.get('sentry:install-id')
if not install_id:
logger.info('Generated installation ID: %s', install_id)
install_id = sha1(uuid4().hex).hexdigest()
options.set('sentry:install-id', install_id)
end = timezone.now()
events_24h = tsdb.get_sums(
model=tsdb.models.internal,
keys=['events.total'],
start=end - timedelta(hours=24),
end=end,
)['events.total']
payload = {
'install_id': install_id,
'version': sentry.get_version(),
'admin_email': settings.SENTRY_ADMIN_EMAIL,
'data': {
# TODO(dcramer): we'd also like to get an idea about the throughput
# of the system (i.e. events in 24h)
'users': User.objects.count(),
'projects': Project.objects.count(),
'teams': Team.objects.count(),
'organizations': Organization.objects.count(),
'events.24h': events_24h,
}
}
# TODO(dcramer): relay the response 'notices' as admin broadcasts
try:
request = safe_urlopen(BEACON_URL, json=payload, timeout=5)
response = safe_urlread(request)
except Exception:
logger.warning('Failed sending beacon', exc_info=True)
return
data = json.loads(response)
if 'version' in data:
options.set('sentry:latest_version', data['version']['stable'])
开发者ID:carriercomm,项目名称:sentry-1,代码行数:53,代码来源:beacon.py
示例20: exchange_token
def exchange_token(self, request, pipeline, code):
# TODO: this needs the auth yet
data = self.get_token_params(
code=code,
redirect_uri=absolute_uri(pipeline.redirect_url()),
)
verify_ssl = pipeline.config.get('verify_ssl', True)
req = safe_urlopen(self.access_token_url, data=data, verify_ssl=verify_ssl)
body = safe_urlread(req)
if req.headers['Content-Type'].startswith('application/x-www-form-urlencoded'):
return dict(parse_qsl(body))
return json.loads(body)
开发者ID:binlee1990,项目名称:sentry,代码行数:12,代码来源:oauth2.py
注:本文中的sentry.http.safe_urlread函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论