• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python http_client.HTTPConnection类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.http_client.HTTPConnection的典型用法代码示例。如果您正苦于以下问题:Python HTTPConnection类的具体用法?Python HTTPConnection怎么用?Python HTTPConnection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HTTPConnection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self, url, key = None, secret = None, timeout = 5, context = None):
        """
        Create a new Crossbar.io push client.
        The only mandatory argument is the Push service endpoint of the Crossbar.io
        instance to push to.
        For signed pushes, provide authentication key and secret. If those are not
        given, unsigned pushes are performed.
        :param url: URL of the HTTP bridge of Crossbar.io (e.g. http://example.com:8080/push).
        :type url: str
        :param key: Optional key to use for signing requests.
        :type key: str
        :param secret: When using signed request, the secret corresponding to key.
        :type secret: str
        :param timeout: Timeout for requests.
        :type timeout: int
        :param context: If the HTTP bridge is running on HTTPS (that is securely over TLS),
            then the context provides the SSL settings the client should use (e.g. the
            certificate chain against which to verify the server certificate). This parameter
            is only available on Python 2.7.9+ and Python 3 (otherwise the parameter is silently
            ignored!). See: https://docs.python.org/2/library/ssl.html#ssl.SSLContext
        :type context: obj or None
        """
        if six.PY2:
            if type(url) == str:
                url = six.u(url)
            if type(key) == str:
                key = six.u(key)
            if type(secret) == str:
                secret = six.u(secret)

        assert(type(url) == six.text_type)
        assert((key and secret) or (not key and not secret))
        assert(key is None or type(key) == six.text_type)
        assert(secret is None or type(secret) == six.text_type)
        assert(type(timeout) == int)
        if _HAS_SSL and _HAS_SSL_CLIENT_CONTEXT:
            assert(context is None or isinstance(context, ssl.SSLContext))

        self._seq = 1
        self._key = key
        self._secret = secret

        self._endpoint = _parse_url(url)
        self._endpoint['headers'] = {
            "Content-type": "application/json",
            "User-agent": "crossbarconnect-python"
        }

        if self._endpoint['secure']:
            if not _HAS_SSL:
                raise Exception("Bridge URL is using HTTPS, but Python SSL module is missing")
            if _HAS_SSL_CLIENT_CONTEXT:
                self._connection = HTTPSConnection(self._endpoint['host'],
                        self._endpoint['port'], timeout = timeout, context = context)
            else:
                self._connection = HTTPSConnection(self._endpoint['host'],
                        self._endpoint['port'], timeout = timeout)
        else:
            self._connection = HTTPConnection(self._endpoint['host'],
                    self._endpoint['port'], timeout = timeout)
开发者ID:Mixone-FinallyHere,项目名称:incubator,代码行数:60,代码来源:crossbarconnect.py


示例2: report_sauce_status

    def report_sauce_status(self, name, status, tags=[], remote_url=''):
        """Report test status and tags to SauceLabs
        """
        job_id = BuiltIn().get_library_instance(
            'Selenium2Library')._current_browser().session_id

        if USERNAME_ACCESS_KEY.match(remote_url):
            username, access_key =\
                USERNAME_ACCESS_KEY.findall(remote_url)[0][1:]
        else:
            username = os.environ.get('SAUCE_USERNAME')
            access_key = os.environ.get('SAUCE_ACCESS_KEY')

        if not job_id:
            return u"No Sauce job id found. Skipping..."
        elif not username or not access_key:
            return u"No Sauce environment variables found. Skipping..."

        token = base64.encodestring('%s:%s' % (username, access_key))[:-1]
        body = json.dumps({'name': name,
                           'passed': status == 'PASS',
                           'tags': tags})

        connection = HTTPConnection('saucelabs.com')
        connection.request('PUT', '/rest/v1/%s/jobs/%s' % (
            username, job_id), body,
            headers={'Authorization': 'Basic %s' % token}
        )
        return connection.getresponse().status
开发者ID:plone,项目名称:plone.app.robotframework,代码行数:29,代码来源:saucelabs.py


示例3: connect

    def connect(self):
        """
        Override the connect() function to intercept calls to certain
        host/ports.

        If no app at host/port has been registered for interception then
        a normal HTTPConnection is made.
        """
        if debuglevel:
            sys.stderr.write('connect: %s, %s\n' % (self.host, self.port,))

        try:
            (app, script_name) = self.get_app(self.host, self.port)
            if app:
                if debuglevel:
                    sys.stderr.write('INTERCEPTING call to %s:%s\n' %
                                     (self.host, self.port,))
                self.sock = wsgi_fake_socket(app, self.host, self.port,
                                             script_name)
            else:
                HTTPConnection.connect(self)

        except Exception:
            if debuglevel:              # intercept & print out tracebacks
                traceback.print_exc()
            raise
开发者ID:cdent,项目名称:wsgi-intercept,代码行数:26,代码来源:__init__.py


示例4: WebcamStreamBase

class WebcamStreamBase(VideoStream):
    request_headers = {
        'Accept': '*/*',
        }

    def __init__(self, url,
                 max_rate=3.0,
                 rate_bucket_size=None,
                 socket_timeout=10,
                 user_agent=DEFAULT_USER_AGENT):
        self.url = url
        netloc, self.path = _parse_url(url)
        self.conn = HTTPConnection(netloc, timeout=socket_timeout)
        self.request_headers = self.request_headers.copy()
        self.request_headers['User-Agent'] = user_agent

        self.stream = None
        self.rate_limiter = BucketRateLimiter(max_rate, rate_bucket_size)
        self.open_rate_limiter = BackoffRateLimiter(socket_timeout)

    def __repr__(self):
        return u"<%s at 0x%x: %s>" % (
            self.__class__.__name__, id(self), self.url)

    @classmethod
    def from_settings(cls, settings, prefix='webcam.', **defaults):
        config = config_from_settings(settings, prefix=prefix,
                                      subprefix=cls.settings_subprefix,
                                      **defaults)
        return cls(**config)


    def close(self):
        if self.stream:
            self.stream = None
        if self.conn:
            self.conn.close()
            self.conn = None

    @property
    def closed(self):
        return not self.conn

    def next(self):
        # FIXME: check closed more often?
        if self.closed:
            raise StopIteration()
        next(self.rate_limiter)
        try:
            if self.stream is None:
                next(self.open_rate_limiter)
                self.stream = self._open_stream()
            frame = next(self.stream)
            self.open_rate_limiter.reset()
            return frame
        except Exception as ex:
            self.stream = None
            log.warn("Streaming failed: %s", text_type(ex) or repr(ex))
            self.conn.close()
            return None
开发者ID:dairiki,项目名称:puppyserv,代码行数:60,代码来源:webcam.py


示例5: _check_storage

def _check_storage(ipport, path):
    conn = HTTPConnection(*ipport)
    conn.request('GET', path)
    resp = conn.getresponse()
    # 404 because it's a nonsense path (and mount_check is false)
    # 507 in case the test target is a VM using mount_check
    if resp.status not in (404, 507):
        raise Exception(
            'Unexpected status %s' % resp.status)
    return resp
开发者ID:matthewoliver,项目名称:swift,代码行数:10,代码来源:common.py


示例6: set_test_status

 def set_test_status(self, passed=True):
     connection = HTTPConnection("saucelabs.com")
     connection.request(
         'PUT',
         '/rest/v1/{0}/jobs/{1}'.format(
             self.username, self.driver.session_id
         ),
         json.dumps({"passed": passed}),
         headers={"Authorization": "Basic {0}".format(self.sauce_auth)}
     )
     result = connection.getresponse()
     return result.status == 200
开发者ID:daleathan,项目名称:weblate,代码行数:12,代码来源:test_selenium.py


示例7: wait_for_server_to_hangup

def wait_for_server_to_hangup(ipport):
    try_until = time() + 30
    while True:
        try:
            conn = HTTPConnection(*ipport)
            conn.request('GET', '/')
            conn.getresponse()
        except Exception:
            break
        if time() > try_until:
            raise Exception(
                'Still answering on %s:%s after 30 seconds' % ipport)
        sleep(0.1)
开发者ID:jgmerritt,项目名称:swift,代码行数:13,代码来源:common.py


示例8: open_http_connection

    def open_http_connection(self, code, query=None, method='GET'):
        """
        Send a request to non-sandboxed Splash, return an HTTPConnection.
        create_file Lua function is pre-loaded.

        XXX: why can't we use requests or urllib, why
        don't they close a connection after a timeout error?
        """
        q = {"lua_source": self.CREATE_FILE + "\n" + code}
        q.update(query or {})
        conn = HTTPConnection('localhost', self.splash_unrestricted.portnum)
        conn.request(method, "/execute/?" + six.moves.urllib.parse.urlencode(q))
        return conn
开发者ID:Cloverseer,项目名称:splash,代码行数:13,代码来源:test_client_disconnects.py


示例9: __init__

 def __init__(self, host, port=None, key_file=None, cert_file=None,
                          ca_certs=None, strict=None, **kwargs):
     if six.PY2:
         HTTPConnection.__init__(self, host=host, port=port, strict=strict, **kwargs)
     else:
         # python3's HTTPConnection dropped the strict argument.
         HTTPConnection.__init__(self, host=host, port=port, **kwargs)
     self.key_file = key_file
     self.cert_file = cert_file
     self.ca_certs = ca_certs
     if self.ca_certs:
         self.cert_reqs = ssl.CERT_REQUIRED
     else:
         self.cert_reqs = ssl.CERT_NONE
开发者ID:DeepikaDhiman,项目名称:freenas-pkgtools,代码行数:14,代码来源:Configuration.py


示例10: status

    def status(self):
        succubus_status = super(AlppacaDaemon, self).status()
        if succubus_status != 0:
            print("succubus_status is {0}".format(str(succubus_status)))
            return succubus_status

        conn = HTTPConnection('169.254.169.254', timeout=0.1)
        try:
            conn.request("GET", "/")
            conn.getresponse()
        except Exception as e:
            print("Error: alppaca is not reachable via IP 169.254.169.254. {0}".format(e))
            return 3
        else:
            return 0
开发者ID:ImmobilienScout24,项目名称:afp-alppaca,代码行数:15,代码来源:main.py


示例11: is_alive

    def is_alive(self):
        """Test that the connection is still alive.

        Because the remote communication happens over HTTP we need to
        make an explicit request to the remote.  It is allowed for
        WebDriver spec tests to not have a WebDriver session, since this
        may be what is tested.

        An HTTP request to an invalid path that results in a 404 is
        proof enough to us that the server is alive and kicking.
        """
        conn = HTTPConnection(self.server.host, self.server.port)
        conn.request("HEAD", self.server.base_path + "invalid")
        res = conn.getresponse()
        return res.status == 404
开发者ID:simartin,项目名称:servo,代码行数:15,代码来源:base.py


示例12: kill_server

def kill_server(ipport, ipport2server, pids):
    server, number = get_server_number(ipport, ipport2server)
    err = Manager([server]).kill(number=number)
    if err:
        raise Exception("unable to kill %s" % (server if not number else "%s%s" % (server, number)))
    try_until = time() + 30
    while True:
        try:
            conn = HTTPConnection(*ipport)
            conn.request("GET", "/")
            conn.getresponse()
        except Exception as err:
            break
        if time() > try_until:
            raise Exception("Still answering on %s:%s after 30 seconds" % ipport)
        sleep(0.1)
开发者ID:heemanshu,项目名称:swift_liberty,代码行数:16,代码来源:common.py


示例13: _get_conn

    def _get_conn(self):
        if self._conn is not None:
            return self._conn

        host, port = self._daemon.split(':', 2)
        port = int(port)
        self._conn = HTTPConnection(host, port, timeout=self._timeout)
        return self._conn
开发者ID:douban,项目名称:pymesos,代码行数:8,代码来源:operator_v1.py


示例14: check_server

def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT):
    server = ipport2server[ipport]
    if server[:-1] in ('account', 'container', 'object'):
        if int(server[-1]) > 4:
            return None
        path = '/connect/1/2'
        if server[:-1] == 'container':
            path += '/3'
        elif server[:-1] == 'object':
            path += '/3/4'
        try_until = time() + timeout
        while True:
            try:
                conn = HTTPConnection(*ipport)
                conn.request('GET', path)
                resp = conn.getresponse()
                # 404 because it's a nonsense path (and mount_check is false)
                # 507 in case the test target is a VM using mount_check
                if resp.status not in (404, 507):
                    raise Exception(
                        'Unexpected status %s' % resp.status)
                break
            except Exception as err:
                if time() > try_until:
                    print(err)
                    print('Giving up on %s:%s after %s seconds.' % (
                        server, ipport, timeout))
                    raise err
                sleep(0.1)
    else:
        try_until = time() + timeout
        while True:
            try:
                url, token = get_auth('http://%s:%d/auth/v1.0' % ipport,
                                      'test:tester', 'testing')
                account = url.split('/')[-1]
                head_account(url, token)
                return url, token, account
            except Exception as err:
                if time() > try_until:
                    print(err)
                    print('Giving up on proxy:8080 after 30 seconds.')
                    raise err
                sleep(0.1)
    return None
开发者ID:Ahiknsr,项目名称:swift,代码行数:45,代码来源:common.py


示例15: send_email

def send_email(request):
  try:
    recipients = request.GET['to'].split(',')
    url = request.GET['url']
    proto, server, path, query, frag = urlsplit(url)
    if query: path += '?' + query
    conn = HTTPConnection(server)
    conn.request('GET',path)
    try: # Python 2.7+, use buffering of HTTP responses
      resp = conn.getresponse(buffering=True)
    except TypeError:  # Python 2.6 and older
      resp = conn.getresponse()
    assert resp.status == 200, "Failed HTTP response %s %s" % (resp.status, resp.reason)
    rawData = resp.read()
    conn.close()
    message = MIMEMultipart()
    message['Subject'] = "Graphite Image"
    message['To'] = ', '.join(recipients)
    message['From'] = '[email protected]%s' % gethostname()
    text = MIMEText( "Image generated by the following graphite URL at %s\r\n\r\n%s" % (ctime(),url) )
    image = MIMEImage( rawData )
    image.add_header('Content-Disposition', 'attachment', filename="composer_" + strftime("%b%d_%I%M%p.png"))
    message.attach(text)
    message.attach(image)
    s = SMTP(settings.SMTP_SERVER)
    s.sendmail('[email protected]%s' % gethostname(),recipients,message.as_string())
    s.quit()
    return HttpResponse( "OK" )
  except:
    return HttpResponse( format_exc() )
开发者ID:aihua,项目名称:graphite-web,代码行数:30,代码来源:views.py


示例16: predict

    def predict(self, examples, **kwargs):
        """Run prediction over HTTP/REST.

        :param examples: The input examples
        :return: The outcomes
        """

        verify_example(examples, self.input_keys)

        request = self.create_request(examples)
        conn = HTTPConnection(self.hostname, self.port)
        conn.request('POST', self.path, json.dumps(request), self.headers)
        response = conn.getresponse().read()
        outcomes_list = json.loads(response)
        if "error" in outcomes_list:
            raise ValueError("remote server returns error: {0}".format(outcomes_list["error"]))
        outcomes_list = outcomes_list["outputs"]
        outcomes_list = self.deserialize_response(examples, outcomes_list)
        return outcomes_list
开发者ID:dpressel,项目名称:baseline,代码行数:19,代码来源:remote.py


示例17: check_server

def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT):
    server = ipport2server[ipport]
    if server[:-1] in ("account", "container", "object"):
        if int(server[-1]) > 4:
            return None
        path = "/connect/1/2"
        if server[:-1] == "container":
            path += "/3"
        elif server[:-1] == "object":
            path += "/3/4"
        try_until = time() + timeout
        while True:
            try:
                conn = HTTPConnection(*ipport)
                conn.request("GET", path)
                resp = conn.getresponse()
                # 404 because it's a nonsense path (and mount_check is false)
                # 507 in case the test target is a VM using mount_check
                if resp.status not in (404, 507):
                    raise Exception("Unexpected status %s" % resp.status)
                break
            except Exception as err:
                if time() > try_until:
                    print(err)
                    print("Giving up on %s:%s after %s seconds." % (server, ipport, timeout))
                    raise err
                sleep(0.1)
    else:
        try_until = time() + timeout
        while True:
            try:
                url, token = get_auth("http://%s:%d/auth/v1.0" % ipport, "test:tester", "testing")
                account = url.split("/")[-1]
                head_account(url, token)
                return url, token, account
            except Exception as err:
                if time() > try_until:
                    print(err)
                    print("Giving up on proxy:8080 after 30 seconds.")
                    raise err
                sleep(0.1)
    return None
开发者ID:heemanshu,项目名称:swift_liberty,代码行数:42,代码来源:common.py


示例18: _get_conn

    def _get_conn(self):
        if not self.connected:
            return None

        if self._conn is not None:
            return self._conn

        host, port = self.master.split(':', 2)
        port = int(port)
        self._conn = HTTPConnection(host, port, timeout=self._timeout)
        return self._conn
开发者ID:windreamer,项目名称:pymesos,代码行数:11,代码来源:executor.py


示例19: compute_engine_id

def compute_engine_id():
    """Gets the Compute Engine project ID if it can be inferred.

    Uses 169.254.169.254 for the metadata server to avoid request
    latency from DNS lookup.

    See https://cloud.google.com/compute/docs/metadata#metadataserver
    for information about this IP address. (This IP is also used for
    Amazon EC2 instances, so the metadata flavor is crucial.)

    See https://github.com/google/oauth2client/issues/93 for context about
    DNS latency.

    :rtype: string or ``NoneType``
    :returns: Compute Engine project ID if the metadata service is available,
              else ``None``.
    """
    host = '169.254.169.254'
    uri_path = '/computeMetadata/v1/project/project-id'
    headers = {'Metadata-Flavor': 'Google'}
    connection = HTTPConnection(host, timeout=0.1)

    try:
        connection.request('GET', uri_path, headers=headers)
        response = connection.getresponse()
        if response.status == 200:
            return response.read()
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        pass
    finally:
        connection.close()
开发者ID:Loooffy,项目名称:gcloud-python,代码行数:31,代码来源:_implicit_environ.py


示例20: __init__

    def __init__(self, url,
                 max_rate=3.0,
                 rate_bucket_size=None,
                 socket_timeout=10,
                 user_agent=DEFAULT_USER_AGENT):
        self.url = url
        netloc, self.path = _parse_url(url)
        self.conn = HTTPConnection(netloc, timeout=socket_timeout)
        self.request_headers = self.request_headers.copy()
        self.request_headers['User-Agent'] = user_agent

        self.stream = None
        self.rate_limiter = BucketRateLimiter(max_rate, rate_bucket_size)
        self.open_rate_limiter = BackoffRateLimiter(socket_timeout)
开发者ID:dairiki,项目名称:puppyserv,代码行数:14,代码来源:webcam.py



注:本文中的six.moves.http_client.HTTPConnection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python http_cookiejar.CookieJar类代码示例发布时间:2022-05-27
下一篇:
Python html_parser.HTMLParser类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap