本文整理汇总了Python中sentry.utils.json.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _kwarg_value
def _kwarg_value(self, key, **kwargs):
"""
Support a dot notation shortcut for deeply nested dicts or just look
up the value if passed a normal key.
>>> self.kwargs = {'foo': {'bar': {'baz': 1}}}
>>> self._kwarg_value('foo.bar.baz')
1
>>> self._kwarg_value('foo')
{'bar': {'baz': 1}}
"""
if '.' in key:
keys = deque(key.split('.'))
else:
kwarg = self.kwargs[key]
if kwargs.get('format') == 'json':
return json.loads(kwarg)
return kwarg
kwarg = dict(self.kwargs)
if kwargs.get('format') == 'json':
kwarg = json.loads(kwarg[keys.popleft()])
while keys:
kwarg = kwarg[keys.popleft()]
return kwarg
开发者ID:yaoqi,项目名称:sentry,代码行数:30,代码来源:faux.py
示例2: test_compare_commits_no_start
def test_compare_commits_no_start(self):
responses.add(
responses.GET,
'https://example.gitlab.com/api/v4/projects/%s/repository/commits/xyz' % self.gitlab_id,
json={'created_at': '2018-09-19T13:14:15Z'}
)
responses.add(
responses.GET,
'https://example.gitlab.com/api/v4/projects/%s/repository/commits?until=2018-09-19T13:14:15Z' % self.gitlab_id,
json=json.loads(COMMIT_LIST_RESPONSE)
)
responses.add(
responses.GET,
'https://example.gitlab.com/api/v4/projects/%s/repository/commits/ed899a2f4b50b4370feeea94676502b42383c746/diff' % self.gitlab_id,
json=json.loads(COMMIT_DIFF_RESPONSE)
)
responses.add(
responses.GET,
'https://example.gitlab.com/api/v4/projects/%s/repository/commits/6104942438c14ec7bd21c6cd5bd995272b3faff6/diff' % self.gitlab_id,
json=json.loads(COMMIT_DIFF_RESPONSE)
)
response = self.create_repository(self.default_repository_config,
self.integration.id)
repo = Repository.objects.get(pk=response.data['id'])
commits = self.provider.compare_commits(repo, None, 'xyz')
for commit in commits:
assert_commit_shape(commit)
开发者ID:alexandrul,项目名称:sentry,代码行数:28,代码来源:test_repository.py
示例3: put
def put(self, request):
"""
Verify a User
`````````````
This endpoint verifies the currently authenticated user (for example, to gain superuser).
:auth: required
"""
if not request.user.is_authenticated():
return Response(status=status.HTTP_401_UNAUTHORIZED)
validator = AuthVerifyValidator(data=request.DATA)
if not validator.is_valid():
return self.respond(validator.errors, status=status.HTTP_400_BAD_REQUEST)
authenticated = False
# See if we have a u2f challenge/response
if 'challenge' in validator.object and 'response' in validator.object:
try:
interface = Authenticator.objects.get_interface(request.user, 'u2f')
if not interface.is_enrolled:
raise LookupError()
challenge = json.loads(validator.object['challenge'])
response = json.loads(validator.object['response'])
authenticated = interface.validate_response(request, challenge, response)
except ValueError:
pass
except LookupError:
pass
# attempt password authentication
else:
authenticated = request.user.check_password(validator.object['password'])
# UI treats 401s by redirecting, this 401 should be ignored
if not authenticated:
return Response({'detail': {'code': 'ignore'}}, status=status.HTTP_403_FORBIDDEN)
try:
# Must use the real request object that Django knows about
auth.login(request._request, request.user)
except auth.AuthUserPasswordExpired:
return Response(
{
'code': 'password-expired',
'message': 'Cannot sign-in with basic auth because password has expired.',
},
status=status.HTTP_403_FORBIDDEN
)
request.user = request._request.user
return self.get(request)
开发者ID:Kayle009,项目名称:sentry,代码行数:56,代码来源:auth_index.py
示例4: test_resolve_issue
def test_resolve_issue(self):
status_action = {
'name': 'resolve_dialog',
'value': 'resolve_dialog',
}
# Expect request to open dialog on slack
responses.add(
method=responses.POST,
url='https://slack.com/api/dialog.open',
body='{"ok": true}',
status=200,
content_type='application/json',
)
resp = self.post_webhook(action_data=[status_action])
assert resp.status_code == 200, resp.content
# Opening dialog should *not* cause the current message to be updated
assert resp.content == ''
data = parse_qs(responses.calls[0].request.body)
assert data['token'][0] == self.integration.metadata['access_token']
assert data['trigger_id'][0] == self.trigger_id
assert 'dialog' in data
dialog = json.loads(data['dialog'][0])
callback_data = json.loads(dialog['callback_id'])
assert int(callback_data['issue']) == self.group1.id
assert callback_data['orig_response_url'] == self.response_url
# Completing the dialog will update the message
responses.add(
method=responses.POST,
url=self.response_url,
body='{"ok": true}',
status=200,
content_type='application/json',
)
resp = self.post_webhook(
type='dialog_submission',
callback_id=dialog['callback_id'],
data={'submission': {'resolve_type': 'resolved'}}
)
self.group1 = Group.objects.get(id=self.group1.id)
assert resp.status_code == 200, resp.content
assert self.group1.get_status() == GroupStatus.RESOLVED
update_data = json.loads(responses.calls[1].request.body)
expect_status = u'*Issue resolved by <@{}>*'.format(self.identity.external_id)
assert update_data['text'].endswith(expect_status)
开发者ID:Kayle009,项目名称:sentry,代码行数:54,代码来源:test_action_endpoint.py
示例5: enroll
def enroll(self, request, interface, insecure=False):
challenge = request.POST.get('challenge')
if challenge:
interface.enrollment_data = json.loads(challenge)
response = request.POST.get('response')
if response:
interface.try_enroll(json.loads(response))
return TwoFactorSettingsView.enroll(self, request, interface)
context = self.make_context(request, interface)
return render_to_response('sentry/account/twofactor/enroll_u2f.html',
context, request)
开发者ID:JJediny,项目名称:sentry,代码行数:13,代码来源:accounts_twofactor.py
示例6: handle
def handle(self, request):
user = auth.get_pending_2fa_user(request)
if user is None or request.user.is_authenticated():
return HttpResponseRedirect(auth.get_login_url())
interfaces = Authenticator.objects.all_interfaces_for_user(user)
# If for whatever reason we ended up here but the user has no 2FA
# enabled, we just continue successfully.
if not interfaces:
return self.perform_signin(request, user)
challenge = activation = None
interface = self.negotiate_interface(request, interfaces)
if request.method == "GET":
activation = interface.activate(request)
if activation is not None and activation.type == "challenge":
challenge = activation.challenge
elif "challenge" in request.POST:
challenge = json.loads(request.POST["challenge"])
form = TwoFactorForm()
# If an OTP response was supplied, we try to make it pass.
otp = request.POST.get("otp")
if otp:
used_interface = self.validate_otp(otp, interface, interfaces)
if used_interface is not None:
return self.perform_signin(request, user, used_interface)
self.fail_signin(request, user, form)
# If a challenge and response exists, validate
if challenge:
response = request.POST.get("response")
if response:
response = json.loads(response)
if interface.validate_response(request, challenge, response):
return self.perform_signin(request, user, interface)
self.fail_signin(request, user, form)
return render_to_response(
["sentry/twofactor_%s.html" % interface.interface_id, "sentry/twofactor.html"],
{
"form": form,
"interface": interface,
"other_interfaces": self.get_other_interfaces(interface, interfaces),
"activation": activation,
},
request,
status=200,
)
开发者ID:ForkRepo,项目名称:sentry,代码行数:51,代码来源:twofactor.py
示例7: handle_sudo
def handle_sudo(self, request, redirect_to, context):
interface = Authenticator.objects.get_interface(request.user, 'u2f')
if interface.is_available and interface.is_enrolled:
challenge = interface.activate(request).challenge
if request.method == 'POST':
if 'challenge' in request.POST:
challenge = json.loads(request.POST['challenge'])
if 'response' in request.POST:
response = json.loads(request.POST['response'])
if interface.validate_response(request, challenge, response):
return True
context['u2f_challenge'] = challenge
return BaseSudoView.handle_sudo(self, request, redirect_to, context)
开发者ID:zhangmuxi,项目名称:sentry,代码行数:15,代码来源:sudo.py
示例8: test_compare_commits_no_start
def test_compare_commits_no_start(self):
stub_installation_token()
responses.add(
responses.GET,
'https://api.github.com/repos/getsentry/example-repo/commits?sha=abcdef',
json=json.loads(GET_LAST_COMMITS_EXAMPLE)
)
responses.add(
responses.GET,
'https://api.github.com/repos/getsentry/example-repo/commits/6dcb09b5b57875f334f61aebed695e2e4193db5e',
json=json.loads(GET_COMMIT_EXAMPLE)
)
result = self.provider.compare_commits(self.repository, None, 'abcdef')
for commit in result:
assert_commit_shape(commit)
开发者ID:Kayle009,项目名称:sentry,代码行数:15,代码来源:test_repository.py
示例9: handle
def handle(self, request):
user = auth.get_pending_2fa_user(request)
if user is None or request.user.is_authenticated():
return HttpResponseRedirect(reverse('sentry'))
interfaces = Authenticator.objects.all_interfaces_for_user(user)
# If for whatever reason we ended up here but the user has no 2FA
# enabled, we just continue successfully.
if not interfaces:
return self.perform_signin(request, user)
challenge = activation = None
interface = self.negotiate_interface(request, interfaces)
if request.method == 'GET':
activation = interface.activate(request)
if activation is not None and activation.type == 'challenge':
challenge = activation.challenge
elif 'challenge' in request.POST:
challenge = json.loads(request.POST['challenge'])
form = TwoFactorForm()
# If an OTP response was supplied, we try to make it pass.
otp = request.POST.get('otp')
if otp:
used_interface = self.validate_otp(otp, interface, interfaces)
if used_interface is not None:
return self.perform_signin(request, user, used_interface)
self.fail_signin(request, user, form)
# If a challenge and response exists, validate
if challenge:
response = request.POST.get('response')
if response:
response = json.loads(response)
if interface.validate_response(request, challenge, response):
return self.perform_signin(request, user, interface)
self.fail_signin(request, user, form)
return render_to_response(['sentry/twofactor_%s.html' %
interface.interface_id,
'sentry/twofactor.html'], {
'form': form,
'interface': interface,
'other_interfaces': self.get_other_interfaces(interface, interfaces),
'activation': activation,
}, request, status=200)
开发者ID:JJediny,项目名称:sentry,代码行数:48,代码来源:twofactor.py
示例10: exchange_token
def exchange_token(self, request, pipeline, code):
# TODO: this needs the auth yet
data = self.get_token_params(
code=code,
redirect_uri=absolute_uri(pipeline.redirect_url()),
)
verify_ssl = pipeline.config.get('verify_ssl', True)
try:
req = safe_urlopen(self.access_token_url, data=data, verify_ssl=verify_ssl)
body = safe_urlread(req)
if req.headers.get('Content-Type', '').startswith('application/x-www-form-urlencoded'):
return dict(parse_qsl(body))
return json.loads(body)
except SSLError:
logger.info('identity.oauth2.ssl-error', extra={
'url': self.access_token_url,
'verify_ssl': verify_ssl,
})
url = self.access_token_url
return {
'error': 'Could not verify SSL certificate',
'error_description': u'Ensure that {} has a valid SSL certificate'.format(url)
}
except JSONDecodeError:
logger.info('identity.oauth2.json-error', extra={
'url': self.access_token_url,
})
return {
'error': 'Could not decode a JSON Response',
'error_description': u'We were not able to parse a JSON response, please try again.'
}
开发者ID:Kayle009,项目名称:sentry,代码行数:31,代码来源:oauth2.py
示例11: github_request
def github_request(self, request, url, **kwargs):
"""
Make a GitHub request on behalf of the logged in user. Return JSON
response on success or raise forms.ValidationError on any exception
"""
auth = self.get_auth_for_user(user=request.user)
if auth is None:
raise forms.ValidationError(_("You have not yet associated GitHub with your account."))
headers = kwargs.pop("headers", None) or {}
headers["Authorization"] = "token %s" % auth.tokens["access_token"]
try:
req = safe_urlopen(url, headers=headers, **kwargs)
body = safe_urlread(req)
except requests.RequestException as e:
msg = unicode(e)
raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))
try:
json_resp = json.loads(body)
except ValueError as e:
msg = unicode(e)
raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))
if req.status_code > 399:
raise forms.ValidationError(json_resp["message"])
return json_resp
开发者ID:imankulov,项目名称:sentry-github,代码行数:28,代码来源:plugin.py
示例12: sourcemap_to_index
def sourcemap_to_index(sourcemap):
smap = json.loads(sourcemap)
state_list = []
key_list = []
src_list = set()
content = None
root = smap.get('sourceRoot')
if 'sourcesContent' in smap:
content = {}
for idx, source in enumerate(smap['sources']):
# Apply the root to the source before shoving into the index
# so we can look it up correctly later
source = urljoin(root, source)
if smap['sourcesContent'][idx]:
content[source] = smap['sourcesContent'][idx].splitlines()
else:
content[source] = []
for state in parse_sourcemap(smap):
state_list.append(state)
key_list.append((state.dst_line, state.dst_col))
# Apparently it's possible to not have a src
# specified in the vlq segments
if state.src is not None:
src_list.add(state.src)
return SourceMapIndex(state_list, key_list, src_list, content)
开发者ID:CrazyLionHeart,项目名称:sentry,代码行数:30,代码来源:sourcemaps.py
示例13: _handle_builtin
def _handle_builtin(self, request, project):
endpoint = '/projects/{}/{}/releases/'.format(
project.organization.slug,
project.slug,
)
try:
# Ideally the API client would support some kind of god-mode here
# as we've already confirmed credentials and simply want to execute
# the view code. Instead we hack around it with an ApiKey instance
god = ApiKey(
organization=project.organization,
scopes=getattr(ApiKey.scopes, 'project:write'),
)
resp = client.post(
endpoint,
data=json.loads(request.body),
auth=god,
)
except client.ApiError as exc:
return HttpResponse(
status=exc.status_code,
content=exc.body,
content_type='application/json',
)
return HttpResponse(
status=resp.status_code,
content=json.dumps(resp.data),
content_type='application/json',
)
开发者ID:AyrtonRicardo,项目名称:sentry,代码行数:30,代码来源:release_webhook.py
示例14: test_simple_notification
def test_simple_notification(self):
responses.add(
'POST',
'https://alert.victorops.com/integrations/generic/20131114/alert/secret-api-key/everyone',
body=SUCCESS
)
self.plugin.set_option('api_key', 'secret-api-key', self.project)
self.plugin.set_option('routing_key', 'everyone', self.project)
group = self.create_group(message='Hello world', culprit='foo.bar')
event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'})
rule = Rule.objects.create(project=self.project, label='my rule')
notification = Notification(event=event, rule=rule)
with self.options({'system.url-prefix': 'http://example.com'}):
self.plugin.notify(notification)
request = responses.calls[0].request
payload = json.loads(request.body)
assert {
'message_type': 'WARNING',
'entity_id': group.id,
'entity_display_name': 'Hello world',
'monitoring_tool': 'sentry',
'state_message': 'Stacktrace\n-----------\n\nStacktrace (most recent call last):\n\n File "sentry/models/foo.py", line 29, in build_msg\n string_max_length=self.string_max_length)\n\nMessage\n-----------\n\nHello world',
'timestamp': int(event.datetime.strftime('%s')),
'issue_url': 'http://example.com/baz/bar/issues/%s/' % group.id,
} == payload
开发者ID:getsentry,项目名称:sentry-plugins,代码行数:30,代码来源:test_plugin.py
示例15: view
def view(self, request, group, **kwargs):
if request.GET.get('autocomplete_query'):
query = request.GET.get('q')
if not query:
return JSONResponse({'issues': []})
repo = self.get_option('repo', group.project)
query = 'repo:%s %s' % (repo, query)
url = 'https://api.github.com/search/issues?%s' % (urlencode({'q': query}),)
try:
req = self.make_api_request(request.user, url)
body = safe_urlread(req)
except requests.RequestException as e:
msg = unicode(e)
self.handle_api_error(request, msg)
return JSONResponse({}, status=502)
try:
json_resp = json.loads(body)
except ValueError as e:
msg = unicode(e)
self.handle_api_error(request, msg)
return JSONResponse({}, status=502)
issues = [{
'text': '(#%s) %s' % (i['number'], i['title']),
'id': i['number']
} for i in json_resp.get('items', [])]
return JSONResponse({'issues': issues})
return super(GitHubPlugin, self).view(request, group, **kwargs)
开发者ID:getsentry,项目名称:sentry-github,代码行数:31,代码来源:plugin.py
示例16: exchange_token
def exchange_token(self, request, helper, code):
# TODO: this needs the auth yet
data = self.get_token_params(code=code, redirect_uri=absolute_uri(helper.get_redirect_url()))
req = safe_urlopen(self.access_token_url, data=data)
body = safe_urlread(req)
return json.loads(body)
开发者ID:Juraldinio,项目名称:sentry,代码行数:7,代码来源:oauth2.py
示例17: test_create_issue
def test_create_issue(self, mock_get_jwt):
responses.add(
responses.POST,
'https://api.github.com/installations/github_external_id/access_tokens',
json={'token': 'token_1', 'expires_at': '2018-10-11T22:14:10Z'}
)
responses.add(
responses.POST,
'https://api.github.com/repos/getsentry/sentry/issues',
json={'number': 321, 'title': 'hello', 'body': 'This is the description',
'html_url': 'https://github.com/getsentry/sentry/issues/231'}
)
form_data = {
'repo': 'getsentry/sentry',
'title': 'hello',
'description': 'This is the description',
}
assert self.integration.create_issue(form_data) == {
'key': 321,
'description': 'This is the description',
'title': 'hello',
'url': 'https://github.com/getsentry/sentry/issues/231',
'repo': 'getsentry/sentry',
}
request = responses.calls[0].request
assert request.headers['Authorization'] == 'Bearer jwt_token_1'
request = responses.calls[1].request
assert request.headers['Authorization'] == 'token token_1'
payload = json.loads(request.body)
assert payload == {'body': 'This is the description', 'assignee': None, 'title': 'hello'}
开发者ID:yogeshmangaj,项目名称:sentry,代码行数:34,代码来源:test_issues.py
示例18: load_data
def load_data(platform):
json_path = os.path.join(DATA_ROOT, 'samples', '%s.json' % (platform.encode('utf-8'),))
if not os.path.exists(json_path):
return
with open(json_path) as fp:
data = json.loads(fp.read())
data['platform'] = platform
data['message'] = 'This is an example %s exception' % (
PLATFORM_TITLES.get(platform, platform.title()),)
data['sentry.interfaces.User'] = {
"username": "getsentry",
"id": "1671",
"email": "[email protected]"
}
data['tags'] = [
('foo', 'bar'),
('version', '1.0'),
]
data['sentry.interfaces.Http'] = {
"cookies": {},
"url": "http://example.com/foo",
"headers": {
"Referer": "http://example.com",
"User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36"
},
"env": {},
"query_string": "",
"data": {},
"method": "GET"
}
return data
开发者ID:CaseCommonsDevOps,项目名称:sentry,代码行数:35,代码来源:samples.py
示例19: dispatch
def dispatch(self, request, helper):
access_token = helper.fetch_state('data')['access_token']
req = safe_urlopen('{0}?{1}&alt=json'.format(
USER_DETAILS_ENDPOINT,
urlencode({
'access_token': access_token,
})
))
body = safe_urlread(req)
data = json.loads(body)
if not data.get('data'):
logger.error('Invalid response: %s' % body)
return helper.error(ERR_INVALID_RESPONSE)
if not data.get('data').get('email'):
logger.error('Invalid response: %s' % body)
return helper.error(ERR_INVALID_RESPONSE)
domain = extract_domain(data.get('data').get('email'))
if domain in DOMAIN_BLOCKLIST:
return helper.error(ERR_INVALID_DOMAIN % (domain,))
if self.domain and self.domain != domain:
return helper.error(ERR_INVALID_DOMAIN % (domain,))
helper.bind_state('domain', domain)
helper.bind_state('user', data.get('data'))
return helper.next_step()
开发者ID:dieswaytoofast,项目名称:sentry-auth-google,代码行数:32,代码来源:views.py
示例20: request
def request(self, method, path, user, auth=None, params=None, data=None,
is_sudo=False):
full_path = self.prefix + path
resolver_match = resolve(full_path)
callback, callback_args, callback_kwargs = resolver_match
if data:
# we encode to ensure compatibility
data = json.loads(json.dumps(data))
rf = APIRequestFactory()
mock_request = getattr(rf, method.lower())(full_path, data)
mock_request.auth = auth
mock_request.user = user
mock_request.is_sudo = lambda: is_sudo
if params:
mock_request.GET._mutable = True
mock_request.GET.update(params)
mock_request.GET._mutable = False
if data:
mock_request.POST._mutable = True
mock_request.POST.update(data)
mock_request.POST._mutable = False
response = callback(mock_request, *callback_args, **callback_kwargs)
if 200 <= response.status_code < 400:
return response
raise self.ApiError(response.status_code, response.data)
开发者ID:KinKir,项目名称:sentry,代码行数:31,代码来源:client.py
注:本文中的sentry.utils.json.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论