• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python http.safe_urlread函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sentry.http.safe_urlread函数的典型用法代码示例。如果您正苦于以下问题:Python safe_urlread函数的具体用法?Python safe_urlread怎么用?Python safe_urlread使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了safe_urlread函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _send_webhook

 def _send_webhook(self):
     safe_urlread(
         safe_urlopen(
             url=self.sentry_app.webhook_url,
             data=self.request.body,
             headers=self.request.headers,
             timeout=5,
         )
     )
开发者ID:Kayle009,项目名称:sentry,代码行数:9,代码来源:installation_notifier.py


示例2: _make_request

    def _make_request(self):
        try:
            body = safe_urlread(
                safe_urlopen(
                    url=self._build_url(),
                    headers=self._build_headers(),
                )
            )

            response = json.loads(body)
        except Exception as e:
            logger.info(
                'select-requester.error',
                extra={
                    'sentry_app': self.sentry_app.slug,
                    'install': self.install.uuid,
                    'project': self.project and self.project.slug,
                    'uri': self.uri,
                    'error_message': e.message,
                }
            )
            response = {}

        if not self._validate_response(response):
            raise APIError()

        return self._format_response(response)
开发者ID:getsentry,项目名称:sentry,代码行数:27,代码来源:select_requester.py


示例3: create_issue

    def create_issue(self, request, group, form_data, **kwargs):
        json_data = {
            'story_type': 'bug',
            'name': force_text(form_data['title'], encoding='utf-8', errors='replace'),
            'description': force_text(form_data['description'], encoding='utf-8', errors='replace'),
            'labels': ['sentry'],
        }

        try:
            _url = self.build_api_url(group, 'stories')
            req = self.make_api_request(group.project, _url, json_data=json_data)
            body = safe_urlread(req)
        except requests.RequestException as e:
            msg = six.text_type(e)
            raise PluginError('Error communicating with Pivotal: %s' % (msg, ))

        try:
            json_resp = json.loads(body)
        except ValueError as e:
            msg = six.text_type(e)
            raise PluginError('Error communicating with Pivotal: %s' % (msg, ))

        if req.status_code > 399:
            raise PluginError(json_resp['error'])

        return json_resp['id']
开发者ID:getsentry,项目名称:sentry-plugins,代码行数:26,代码来源:plugin.py


示例4: dispatch

    def dispatch(self, request, helper):
        access_token = helper.fetch_state('data')['access_token']

        req = safe_urlopen('{0}?{1}&alt=json'.format(
            USER_DETAILS_ENDPOINT,
            urlencode({
                'access_token': access_token,
            })
        ))
        body = safe_urlread(req)
        data = json.loads(body)

        if not data.get('data'):
            logger.error('Invalid response: %s' % body)
            return helper.error(ERR_INVALID_RESPONSE)

        if not data.get('data').get('email'):
            logger.error('Invalid response: %s' % body)
            return helper.error(ERR_INVALID_RESPONSE)

        domain = extract_domain(data.get('data').get('email'))

        if domain in DOMAIN_BLOCKLIST:
            return helper.error(ERR_INVALID_DOMAIN % (domain,))

        if self.domain and self.domain != domain:
            return helper.error(ERR_INVALID_DOMAIN % (domain,))

        helper.bind_state('domain', domain)
        helper.bind_state('user', data.get('data'))

        return helper.next_step()
开发者ID:dieswaytoofast,项目名称:sentry-auth-google,代码行数:32,代码来源:views.py


示例5: get_issue_title_by_id

    def get_issue_title_by_id(self, request, group, issue_id):
        _url = '%s/%s' % (self.build_api_url(group, 'stories'), issue_id)
        req = self.make_api_request(group.project, _url)

        body = safe_urlread(req)
        json_resp = json.loads(body)
        return json_resp['name']
开发者ID:getsentry,项目名称:sentry-plugins,代码行数:7,代码来源:plugin.py


示例6: refresh_identity

    def refresh_identity(self, auth_identity):
        refresh_token = auth_identity.data.get("refresh_token")

        if not refresh_token:
            raise IdentityNotValid("Missing refresh token")

        data = self.get_refresh_token_params(refresh_token=refresh_token)
        req = safe_urlopen(self.get_refresh_token_url(), data=data)

        try:
            body = safe_urlread(req)
            payload = json.loads(body)
        except Exception:
            payload = {}

        error = payload.get("error", "unknown_error")
        error_description = payload.get("error_description", "no description available")

        formatted_error = "HTTP {} ({}): {}".format(req.status_code, error, error_description)

        if req.status_code == 401:
            raise IdentityNotValid(formatted_error)

        if req.status_code == 400:
            # this may not be common, but at the very least Google will return
            # an invalid grant when a user is suspended
            if error == "invalid_grant":
                raise IdentityNotValid(formatted_error)

        if req.status_code != 200:
            raise Exception(formatted_error)

        auth_identity.data.update(self.get_oauth_data(payload))
        auth_identity.update(data=auth_identity.data)
开发者ID:AyrtonRicardo,项目名称:sentry,代码行数:34,代码来源:oauth2.py


示例7: exchange_token

    def exchange_token(self, request, helper, code):
        # TODO: this needs the auth yet
        data = self.get_token_params(code=code, redirect_uri=absolute_uri(helper.get_redirect_url()))
        req = safe_urlopen(self.access_token_url, data=data)
        body = safe_urlread(req)

        return json.loads(body)
开发者ID:Juraldinio,项目名称:sentry,代码行数:7,代码来源:oauth2.py


示例8: view

    def view(self, request, group, **kwargs):
        if request.GET.get('autocomplete_query'):
            query = request.GET.get('q')
            if not query:
                return JSONResponse({'issues': []})
            repo = self.get_option('repo', group.project)
            query = 'repo:%s %s' % (repo, query)
            url = 'https://api.github.com/search/issues?%s' % (urlencode({'q': query}),)

            try:
                req = self.make_api_request(request.user, url)
                body = safe_urlread(req)
            except requests.RequestException as e:
                msg = unicode(e)
                self.handle_api_error(request, msg)
                return JSONResponse({}, status=502)

            try:
                json_resp = json.loads(body)
            except ValueError as e:
                msg = unicode(e)
                self.handle_api_error(request, msg)
                return JSONResponse({}, status=502)

            issues = [{
                'text': '(#%s) %s' % (i['number'], i['title']),
                'id': i['number']
            } for i in json_resp.get('items', [])]
            return JSONResponse({'issues': issues})

        return super(GitHubPlugin, self).view(request, group, **kwargs)
开发者ID:getsentry,项目名称:sentry-github,代码行数:31,代码来源:plugin.py


示例9: exchange_token

 def exchange_token(self, request, pipeline, code):
     # TODO: this needs the auth yet
     data = self.get_token_params(
         code=code,
         redirect_uri=absolute_uri(pipeline.redirect_url()),
     )
     verify_ssl = pipeline.config.get('verify_ssl', True)
     try:
         req = safe_urlopen(self.access_token_url, data=data, verify_ssl=verify_ssl)
         body = safe_urlread(req)
         if req.headers.get('Content-Type', '').startswith('application/x-www-form-urlencoded'):
             return dict(parse_qsl(body))
         return json.loads(body)
     except SSLError:
         logger.info('identity.oauth2.ssl-error', extra={
             'url': self.access_token_url,
             'verify_ssl': verify_ssl,
         })
         url = self.access_token_url
         return {
             'error': 'Could not verify SSL certificate',
             'error_description': u'Ensure that {} has a valid SSL certificate'.format(url)
         }
     except JSONDecodeError:
         logger.info('identity.oauth2.json-error', extra={
             'url': self.access_token_url,
         })
         return {
             'error': 'Could not decode a JSON Response',
             'error_description': u'We were not able to parse a JSON response, please try again.'
         }
开发者ID:Kayle009,项目名称:sentry,代码行数:31,代码来源:oauth2.py


示例10: _make_request

    def _make_request(self):
        req = safe_urlopen(
            url=self._build_url(),
            headers=self._build_headers(),
            method='POST',
            data=self.body,
        )

        try:
            body = safe_urlread(req)
            response = json.loads(body)
        except Exception:
            logger.info(
                'issue-link-requester.error',
                extra={
                    'sentry_app': self.sentry_app.slug,
                    'install': self.install.uuid,
                    'project': self.group.project.slug,
                    'group': self.group.id,
                    'uri': self.uri,
                }
            )
            response = {}

        if not self._validate_response(response):
            raise APIError()

        return response
开发者ID:getsentry,项目名称:sentry,代码行数:28,代码来源:issue_link_requester.py


示例11: github_request

    def github_request(self, request, url, **kwargs):
        """
        Make a GitHub request on behalf of the logged in user. Return JSON
        response on success or raise forms.ValidationError on any exception
        """
        auth = self.get_auth_for_user(user=request.user)
        if auth is None:
            raise forms.ValidationError(_("You have not yet associated GitHub with your account."))

        headers = kwargs.pop("headers", None) or {}
        headers["Authorization"] = "token %s" % auth.tokens["access_token"]
        try:
            req = safe_urlopen(url, headers=headers, **kwargs)
            body = safe_urlread(req)
        except requests.RequestException as e:
            msg = unicode(e)
            raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))

        try:
            json_resp = json.loads(body)
        except ValueError as e:
            msg = unicode(e)
            raise forms.ValidationError(_("Error communicating with GitHub: %s") % (msg,))

        if req.status_code > 399:
            raise forms.ValidationError(json_resp["message"])

        return json_resp
开发者ID:imankulov,项目名称:sentry-github,代码行数:28,代码来源:plugin.py


示例12: get_issue_title_by_id

    def get_issue_title_by_id(self, request, group, issue_id):
        url = '%s/%s' % (self.build_api_url(group, 'issues'), issue_id)
        req = self.make_api_request(request.user, url)

        body = safe_urlread(req)
        json_resp = json.loads(body)
        return json_resp['title']
开发者ID:getsentry,项目名称:sentry-github,代码行数:7,代码来源:plugin.py


示例13: create_issue

    def create_issue(self, request, group, form_data, **kwargs):
        # TODO: support multiple identities via a selection input in the form?
        json_data = {
            "title": form_data['title'],
            "body": form_data['description'],
            "assignee": form_data.get('assignee'),
        }

        try:
            url = self.build_api_url(group, 'issues')
            req = self.make_api_request(request.user, url, json_data=json_data)
            body = safe_urlread(req)
        except requests.RequestException as e:
            msg = unicode(e)
            raise forms.ValidationError(_('Error communicating with GitHub: %s') % (msg,))

        try:
            json_resp = json.loads(body)
        except ValueError as e:
            msg = unicode(e)
            raise forms.ValidationError(_('Error communicating with GitHub: %s') % (msg,))

        if req.status_code > 399:
            raise forms.ValidationError(json_resp['message'])

        return json_resp['number']
开发者ID:getsentry,项目名称:sentry-github,代码行数:26,代码来源:plugin.py


示例14: exchange_token

 def exchange_token(self, request, helper, code):
     # TODO: this needs the auth yet
     data = self.get_token_params(code=code, redirect_uri=absolute_uri(helper.get_redirect_url()))
     req = safe_urlopen(self.access_token_url, data=data)
     body = safe_urlread(req)
     if req.headers["Content-Type"].startswith("application/x-www-form-urlencoded"):
         return dict(parse_qsl(body))
     return json.loads(body)
开发者ID:AyrtonRicardo,项目名称:sentry,代码行数:8,代码来源:oauth2.py


示例15: test_simple

    def test_simple(self):
        responses.add(responses.GET, 'http://example.com', body='foo bar')

        resp = safe_urlopen('http://example.com')
        data = safe_urlread(resp)
        assert data == 'foo bar'

        request = responses.calls[0].request
        assert 'User-Agent' in request.headers
        assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:280185386,项目名称:sentry,代码行数:10,代码来源:test_http.py


示例16: fetch_url

def fetch_url(url, project=None):
    """
    Pull down a URL, returning a UrlResult object.

    Attempts to fetch from the cache.
    """

    cache_key = 'source:%s' % (
        hashlib.md5(url.encode('utf-8')).hexdigest(),)
    result = cache.get(cache_key)
    if result is None:
        # lock down domains that are problematic
        domain = urlparse(url).netloc
        domain_key = 'source:%s' % (hashlib.md5(domain.encode('utf-8')).hexdigest(),)
        domain_result = cache.get(domain_key)
        if domain_result:
            return BAD_SOURCE

        headers = []
        if project and is_valid_origin(url, project=project):
            token = project.get_option('sentry:token')
            if token:
                headers.append(('X-Sentry-Token', token))

        try:
            request = safe_urlopen(
                url,
                allow_redirects=True,
                headers=headers,
                timeout=settings.SENTRY_SOURCE_FETCH_TIMEOUT,
            )
        except HTTPError:
            result = BAD_SOURCE
        except Exception:
            # it's likely we've failed due to a timeout, dns, etc so let's
            # ensure we can't cascade the failure by pinning this for 5 minutes
            cache.set(domain_key, 1, 300)
            logger.warning('Disabling sources to %s for %ss', domain, 300,
                           exc_info=True)
            return BAD_SOURCE
        else:
            try:
                body = safe_urlread(request)
            except Exception:
                result = BAD_SOURCE
            else:
                result = (dict(request.headers), body)

        cache.set(cache_key, result, 60)

    if result == BAD_SOURCE:
        return result

    return UrlResult(url, *result)
开发者ID:uber,项目名称:sentry,代码行数:54,代码来源:fetch_source.py


示例17: test_simple

    def test_simple(self, mock_getaddrinfo):
        mock_getaddrinfo.return_value = [(2, 1, 6, '', ('81.0.0.1', 0))]
        responses.add(responses.GET, 'http://example.com', body='foo bar')

        resp = http.safe_urlopen('http://example.com')
        data = http.safe_urlread(resp)
        assert data.decode('utf-8') == 'foo bar'

        request = responses.calls[0].request
        assert 'User-Agent' in request.headers
        assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:Kayle009,项目名称:sentry,代码行数:11,代码来源:test_http.py


示例18: test_simple

    def test_simple(self, mock_gethostbyname):
        mock_gethostbyname.return_value = '81.0.0.1'
        responses.add(responses.GET, 'http://example.com', body='foo bar')

        resp = http.safe_urlopen('http://example.com')
        data = http.safe_urlread(resp)
        assert data == 'foo bar'

        request = responses.calls[0].request
        assert 'User-Agent' in request.headers
        assert 'gzip' in request.headers.get('Accept-Encoding', '')
开发者ID:BrunoAsato,项目名称:sentry,代码行数:11,代码来源:test_http.py


示例19: send_beacon

def send_beacon():
    """
    Send a Beacon to a remote server operated by the Sentry team.

    See the documentation for more details.
    """
    from sentry import options
    from sentry.models import Organization, Project, Team, User

    if not settings.SENTRY_BEACON:
        logger.info('Not sending beacon (disabled)')
        return

    install_id = options.get('sentry:install-id')
    if not install_id:
        logger.info('Generated installation ID: %s', install_id)
        install_id = sha1(uuid4().hex).hexdigest()
        options.set('sentry:install-id', install_id)

    end = timezone.now()
    events_24h = tsdb.get_sums(
        model=tsdb.models.internal,
        keys=['events.total'],
        start=end - timedelta(hours=24),
        end=end,
    )['events.total']

    payload = {
        'install_id': install_id,
        'version': sentry.get_version(),
        'admin_email': settings.SENTRY_ADMIN_EMAIL,
        'data': {
            # TODO(dcramer): we'd also like to get an idea about the throughput
            # of the system (i.e. events in 24h)
            'users': User.objects.count(),
            'projects': Project.objects.count(),
            'teams': Team.objects.count(),
            'organizations': Organization.objects.count(),
            'events.24h': events_24h,
        }
    }

    # TODO(dcramer): relay the response 'notices' as admin broadcasts
    try:
        request = safe_urlopen(BEACON_URL, json=payload, timeout=5)
        response = safe_urlread(request)
    except Exception:
        logger.warning('Failed sending beacon', exc_info=True)
        return

    data = json.loads(response)
    if 'version' in data:
        options.set('sentry:latest_version', data['version']['stable'])
开发者ID:carriercomm,项目名称:sentry-1,代码行数:53,代码来源:beacon.py


示例20: exchange_token

 def exchange_token(self, request, pipeline, code):
     # TODO: this needs the auth yet
     data = self.get_token_params(
         code=code,
         redirect_uri=absolute_uri(pipeline.redirect_url()),
     )
     verify_ssl = pipeline.config.get('verify_ssl', True)
     req = safe_urlopen(self.access_token_url, data=data, verify_ssl=verify_ssl)
     body = safe_urlread(req)
     if req.headers['Content-Type'].startswith('application/x-www-form-urlencoded'):
         return dict(parse_qsl(body))
     return json.loads(body)
开发者ID:binlee1990,项目名称:sentry,代码行数:12,代码来源:oauth2.py



注:本文中的sentry.http.safe_urlread函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python interfaces.Stacktrace类代码示例发布时间:2022-05-27
下一篇:
Python http.safe_urlopen函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap