• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python ssl.match_hostname函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ssl.match_hostname函数的典型用法代码示例。如果您正苦于以下问题:Python match_hostname函数的具体用法?Python match_hostname怎么用?Python match_hostname使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了match_hostname函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: connect

    def connect(self):
        """Connect to Mongo and return a new (connected) socket. Note that the
           pool does not keep a reference to the socket -- you must call
           return_socket() when you're done with it.
        """
        sock = self.create_connection()
        hostname = self.pair[0]

        if self.use_ssl:
            try:
                sock = ssl.wrap_socket(sock,
                                       certfile=self.ssl_certfile,
                                       keyfile=self.ssl_keyfile,
                                       ca_certs=self.ssl_ca_certs,
                                       cert_reqs=self.ssl_cert_reqs)
                if self.ssl_cert_reqs:
                    match_hostname(sock.getpeercert(), hostname)

            except ssl.SSLError:
                sock.close()
                raise ConnectionFailure("SSL handshake failed. MongoDB may "
                                        "not be configured with SSL support.")

        sock.settimeout(self.net_timeout)
        return SocketInfo(sock, self.pool_id, hostname)
开发者ID:iamtonyarmstrong,项目名称:flasky,代码行数:25,代码来源:pool.py


示例2: _connect_socket

    def _connect_socket(self):
        sockerr = None
        inet_address, port = self.endpoint.resolve()
        addresses = socket.getaddrinfo(inet_address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
        if not addresses:
            raise ConnectionException("getaddrinfo returned empty list for %s" % (self.endpoint,))
        for (af, socktype, proto, canonname, sockaddr) in addresses:
            try:
                self._socket = self._socket_impl.socket(af, socktype, proto)
                if self.ssl_context:
                    self._socket = self.ssl_context.wrap_socket(self._socket,
                                                                **(self.ssl_options or {}))
                elif self.ssl_options:
                    if not self._ssl_impl:
                        raise RuntimeError("This version of Python was not compiled with SSL support")
                    self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
                self._socket.settimeout(self.connect_timeout)
                self._socket.connect(sockaddr)
                self._socket.settimeout(None)
                if self._check_hostname:
                    ssl.match_hostname(self._socket.getpeercert(), self.endpoint.address)
                sockerr = None
                break
            except socket.error as err:
                if self._socket:
                    self._socket.close()
                    self._socket = None
                sockerr = err

        if sockerr:
            raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))

        if self.sockopts:
            for args in self.sockopts:
                self._socket.setsockopt(*args)
开发者ID:datastax,项目名称:python-driver,代码行数:35,代码来源:connection.py


示例3: __init__

	def __init__(self, host='127.0.0.1', port=6667, usessl=False, nick="Robot", user="Robot", longuser="I am a robot!", chans={"#yolo"}):
		if usessl: # Set up an SSL socket
			try:
				context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
			except:
				print("ERROR: Can't create an SSL context. You're probably trying to connect using SSL under Python 2. You need to use Python 3.")
				raise
			context.verify_mode = ssl.CERT_REQUIRED
			context.load_verify_locations("/etc/ssl/cert.pem") # May be different for different systems
			context.set_ciphers("DHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-SHA256:DHE-RSA-AES128-SHA256") # Only the good ones
			self.s = context.wrap_socket(socket.socket(socket.AF_INET))
			self.s.connect((host,port))
			cert = self.s.getpeercert()
			ssl.match_hostname(cert,host)
			print("Successfully connected using cipher %s. Huzzah!"%self.s.cipher()[0])
		else: # Set up a normal socket
			self.s = socket.socket()
			self.s.connect((host,port))
		
		self.f = self.s.makefile()
		
		self.write = lambda x : self.s.send(x.encode('UTF-8') + crlf)
		self.read = self.f.readline
		
		self.nick = nick
		self.write('NICK ' + nick)
		self.write('USER ' + user + ' * * :' + longuser)
		if chans:
			for chan in chans:
				self.write('JOIN ' + chan)
		self.chans = set(chans)
开发者ID:asv-github,项目名称:bp4u-irc-bots,代码行数:31,代码来源:bot.py


示例4: _configured_socket

def _configured_socket(address, options):
    """Given (host, port) and PoolOptions, return a configured socket.

    Can raise socket.error, ConnectionFailure, or CertificateError.

    Sets socket's SSL and timeout options.
    """
    sock = _create_connection(address, options)
    ssl_context = options.ssl_context

    if ssl_context is not None:
        try:
            sock = ssl_context.wrap_socket(sock)
        except IOError as exc:
            sock.close()
            raise ConnectionFailure("SSL handshake failed: %s" % (str(exc),))
        if ssl_context.verify_mode and options.ssl_match_hostname:
            try:
                match_hostname(sock.getpeercert(), hostname=address[0])
            except CertificateError:
                sock.close()
                raise

    sock.settimeout(options.socket_timeout)
    return sock
开发者ID:Alpus,项目名称:Eth,代码行数:25,代码来源:pool.py


示例5: connect

 def connect(self, host=None, port=None, address_family=None):
     """
     Method that initiates a connection to an EPP host
     """
     host = host or self.host
     self.sock = socket.socket(address_family or socket.AF_INET, socket.SOCK_STREAM)
     self.sock.settimeout(self.socket_connect_timeout)  # connect timeout
     self.sock.connect((host, port or self.port))
     local_sock_addr = self.sock.getsockname()
     local_addr, local_port = local_sock_addr[:2]
     self.log.debug('connected local=%s:%s remote=%s:%s',
                    local_addr, local_port, self.sock.getpeername()[0], port)
     self.sock.settimeout(self.socket_timeout)  # regular timeout
     if self.ssl_enable:
         self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile,
                                     ssl_version=self.ssl_version,
                                     ciphers=self.ssl_ciphers,
                                     server_side=False,
                                     cert_reqs=self.cert_required,
                                     ca_certs=self.cacerts)
         self.log.debug('%s negotiated with local=%s:%s remote=%s:%s', self.sock.version(),
                        local_addr, local_port, self.sock.getpeername()[0], port)
         if self.validate_hostname:
             try:
                 match_hostname(self.sock.getpeercert(), host)
             except CertificateError as exp:
                 self.log.exception("SSL hostname mismatch")
                 raise EppConnectionError(str(exp))
     self.greeting = EppResponse.from_xml(self.read().decode('utf-8'))
开发者ID:cloudregistry,项目名称:eppy,代码行数:29,代码来源:client.py


示例6: _verify_cert

    def _verify_cert(self, sock: ssl.SSLSocket):
        '''Check if certificate matches hostname.'''
        # Based on tornado.iostream.SSLIOStream
        # Needed for older OpenSSL (<0.9.8f) versions
        verify_mode = self._ssl_context.verify_mode

        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED,
                               ssl.CERT_OPTIONAL), \
            'Unknown verify mode {}'.format(verify_mode)

        if verify_mode == ssl.CERT_NONE:
            return

        cert = sock.getpeercert()

        if not cert and verify_mode == ssl.CERT_OPTIONAL:
            return

        if not cert:
            raise SSLVerificationError('No SSL certificate given')

        try:
            ssl.match_hostname(cert, self._hostname)
        except ssl.CertificateError as error:
            raise SSLVerificationError('Invalid SSL certificate') from error
开发者ID:chfoo,项目名称:wpull,代码行数:25,代码来源:connection.py


示例7: check_hostname

def check_hostname(sock, server_name, additional_names):
    server_certificate = sock.getpeercert()
    if log_enabled(NETWORK):
        log(NETWORK, 'certificate found for %s: %s', sock, server_certificate)
    if additional_names:
        host_names = [server_name] + (additional_names if isinstance(additional_names, SEQUENCE_TYPES) else [additional_names])
    else:
        host_names = [server_name]

    for host_name in host_names:
        if not host_name:
            continue
        elif host_name == '*':
            if log_enabled(NETWORK):
                log(NETWORK, 'certificate matches * wildcard')
            return  # valid

        try:
            match_hostname(server_certificate, host_name)  # raise CertificateError if certificate doesn't match server name
            if log_enabled(NETWORK):
                log(NETWORK, 'certificate matches host name <%s>', host_name)
            return  # valid
        except CertificateError as e:
            if log_enabled(NETWORK):
                log(NETWORK, str(e))

    if log_enabled(ERROR):
        log(ERROR, "hostname doesn't match certificate")
    raise LDAPCertificateError("certificate %s doesn't match any name in %s " % (server_certificate, str(host_names)))
开发者ID:cannatag,项目名称:ldap3,代码行数:29,代码来源:tls.py


示例8: connect

    def connect(self):
        sock = socket.create_connection(
            (self.host, self.port), getattr(self, 'source_address', None)
        )

        # Handle the socket if a (proxy) tunnel is present
        if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()
            # http://bugs.python.org/issue7776: Python>=3.4.1 and >=2.7.7
            # change self.host to mean the proxy server host when tunneling is
            # being used. Adapt, since we are interested in the destination
            # host for the match_hostname() comparison.
            actual_host = self._tunnel_host
        else:
            actual_host = self.host

        self.sock = ssl.wrap_socket(
            sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle
        )
        try:
            match_hostname(self.sock.getpeercert(), actual_host)
        except CertificateError:
            self.sock.shutdown(socket.SHUT_RDWR)
            self.sock.close()
            raise
开发者ID:HiroIshikawa,项目名称:21playground,代码行数:26,代码来源:ssl_support.py


示例9: connect

  def connect(self):

    self.connection_kwargs = {}

    # for > py2.5
    if hasattr(self, 'timeout'):
      self.connection_kwargs.update(timeout = self.timeout)

    # for >= py2.7
    if hasattr(self, 'source_address'):
      self.connection_kwargs.update(source_address = self.source_address)

    sock = socket.create_connection((self.host, self.port), **self.connection_kwargs)

    # for >= py2.7
    if getattr(self, '_tunnel_host', None):
      self.sock = sock
      self._tunnel()

    # set location of certificate authorities
    assert os.path.isfile(tuf.conf.ssl_certificates)
    cert_path = tuf.conf.ssl_certificates

    # TODO: Disallow SSLv2.
    # http://docs.python.org/dev/library/ssl.html#protocol-versions
    # TODO: Select the right ciphers.
    # http://docs.python.org/dev/library/ssl.html#cipher-selection
    self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                cert_reqs=ssl.CERT_REQUIRED,
                                ca_certs=cert_path)

    match_hostname(self.sock.getpeercert(), self.host)
开发者ID:1-Hash,项目名称:tuf,代码行数:32,代码来源:download.py


示例10: create_connection

def create_connection(server=None, port=None, timeout=None, use_ssl=False):
    """Create a socket or a secure ssl socket.

    Hopefully some day such functionality will be included in python3.

    :returns socket
    :rtype socket.SocketType
    """
    # based on socket.create_connection()
    if use_ssl:
        context = ssl_create_default_context()
    err = None
    for af, socktype, proto, canonname, sa in \
            socket.getaddrinfo(server, port, 0, socket.SOCK_STREAM):
        sock = None
        try:
            sock = socket.socket(af, socktype, proto)
            if timeout is not None:
                sock.settimeout(timeout)
            if use_ssl:
                sock = context.wrap_socket(sock, server_hostname=server)
            sock.connect(sa)
            if use_ssl:
                match_hostname(sock.getpeercert(), server)
            return sock

        except socket.error as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        raise err
    else:
        raise socket.error("getaddrinfo returns an empty list")
开发者ID:tochev,项目名称:python3-pcloudapi,代码行数:35,代码来源:utils.py


示例11: connect

    def connect(self, url, **options):
        """
        Connect to url. url is websocket url scheme. ie. ws://host:port/resource
        You can customize using 'options'.
        If you set "header" dict object, you can set your own custom header.

        >>> ws = WebSocket()
        >>> ws.connect("ws://echo.websocket.org/",
                ...     header={"User-Agent: MyProgram",
                ...             "x-custom: header"})

        timeout: socket timeout time. This value is integer.
                 if you set None for this value,
                 it means "use default_timeout value"

        options: current support option is only "header".
                 if you set header as dict value,
                 the custom HTTP headers are added.

        """
        hostname, port, resource, is_secure = _parse_url(url)
        # TODO: we need to support proxy
        self.sock.connect((hostname, port))
        if is_secure:
            if HAVE_SSL:
                sslopt = dict(cert_reqs=ssl.CERT_REQUIRED,
                    ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem"))
                sslopt.update(self.sslopt)
                self.sock = ssl.wrap_socket(self.sock, **sslopt)
                match_hostname(self.sock.getpeercert(), hostname)
            else:
                raise WebSocketException("SSL not available.")

        self._handshake(hostname, port, resource, **options)
开发者ID:fvieira,项目名称:SeeMeCode-sublimetext3,代码行数:34,代码来源:__init__.py


示例12: _create_ssl_connection

    def _create_ssl_connection(self, sock):
        self._logger.debug("Creating ssl connection...")
   
        ssl_protocol_version = ssl.PROTOCOL_SSLv23
 
        if self._port == 443:
            ssl_context = SSLContextBuilder()\
                .with_ca_certs(self._ca_path)\
                .with_cert_key_pair(self._cert_path, self._key_path)\
                .with_cert_reqs(ssl.CERT_REQUIRED)\
                .with_check_hostname(True)\
                .with_ciphers(None)\
                .with_alpn_protocols(['x-amzn-http-ca'])\
                .build()
            ssl_sock = ssl_context.wrap_socket(sock, server_hostname=self._host, do_handshake_on_connect=False)
            ssl_sock.do_handshake()
        else:
            ssl_sock = ssl.wrap_socket(sock,
                                       certfile=self._cert_path,
                                       keyfile=self._key_path,
                                       ca_certs=self._ca_path,
                                       cert_reqs=ssl.CERT_REQUIRED,
                                       ssl_version=ssl_protocol_version)

        self._logger.debug("Matching host name...")
        if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
            self._tls_match_hostname(ssl_sock)
        else:
            ssl.match_hostname(ssl_sock.getpeercert(), self._host)

        return ssl_sock
开发者ID:aws,项目名称:aws-iot-device-sdk-python,代码行数:31,代码来源:providers.py


示例13: connect

    def connect(self):

        if self._tunnel_host:
            server_hostname = self._tunnel_host
        else:
            server_hostname = self.host
        sni_hostname = server_hostname if ssl.HAS_SNI else None  # will be useful eventually

        ns = yield from self._create_connection((self.host, self.port), self.timeout,
                                                       self.source_address, ssl=self._context,
                                                       server_hostname=server_hostname)

        self.notSock = ns

        if self._tunnel_host:
            yield from self._tunnel()
            self.auto_open = 0

        # self.sock = self._context.wrap_socket(self.sock, server_hostname=sni_hostname,
        #                                       do_handshake_on_connect=False)
        if not self._context.check_hostname and self._check_hostname:
            try:
                #sock = self.notSock.socket()
                ssl.match_hostname(self.notSock.peercert(), server_hostname)
            except Exception as e:
                self.close()
                raise
开发者ID:rdbhost,项目名称:yieldfromUrllib3,代码行数:27,代码来源:connection.py


示例14: http_request

    def http_request(self, req):
        tmp_ca_cert_path, paths_checked = self.get_ca_certs()
        https_proxy = os.environ.get('https_proxy')
        context = None
        if HAS_SSLCONTEXT:
            context = self._make_context(tmp_ca_cert_path)

        # Detect if 'no_proxy' environment variable is set and if our URL is included
        use_proxy = self.detect_no_proxy(req.get_full_url())

        if not use_proxy:
            # ignore proxy settings for this host request
            return req

        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            if https_proxy:
                proxy_parts = generic_urlparse(urlparse.urlparse(https_proxy))
                port = proxy_parts.get('port') or 443
                s.connect((proxy_parts.get('hostname'), port))
                if proxy_parts.get('scheme') == 'http':
                    s.sendall(self.CONNECT_COMMAND % (self.hostname, self.port))
                    if proxy_parts.get('username'):
                        credentials = "%s:%s" % (proxy_parts.get('username',''), proxy_parts.get('password',''))
                        s.sendall('Proxy-Authorization: Basic %s\r\n' % credentials.encode('base64').strip())
                    s.sendall('\r\n')
                    connect_result = s.recv(4096)
                    self.validate_proxy_response(connect_result)
                    if context:
                        ssl_s = context.wrap_socket(s, server_hostname=self.hostname)
                    else:
                        ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED, ssl_version=PROTOCOL)
                        match_hostname(ssl_s.getpeercert(), self.hostname)
                else:
                    raise ProxyError('Unsupported proxy scheme: %s. Currently ansible only supports HTTP proxies.' % proxy_parts.get('scheme'))
            else:
                s.connect((self.hostname, self.port))
                if context:
                    ssl_s = context.wrap_socket(s, server_hostname=self.hostname)
                else:
                    ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED, ssl_version=PROTOCOL)
                    match_hostname(ssl_s.getpeercert(), self.hostname)
            # close the ssl connection
            #ssl_s.unwrap()
            s.close()
        except (ssl.SSLError, socket.error), e:
            # fail if we tried all of the certs but none worked
            if 'connection refused' in str(e).lower():
                raise ConnectionError('Failed to connect to %s:%s.' % (self.hostname, self.port))
            else:
                raise SSLValidationError('Failed to validate the SSL certificate for %s:%s.'
                    ' Make sure your managed systems have a valid CA'
                    ' certificate installed.  If the website serving the url'
                    ' uses SNI you need python >= 2.7.9 on your managed'
                    ' machine.  You can use validate_certs=False if you do'
                    ' not need to confirm the server\s identity but this is'
                    ' unsafe and not recommended'
                    ' Paths checked for this platform: %s' % (self.hostname, self.port, ", ".join(paths_checked))
                )
开发者ID:RajeevNambiar,项目名称:temp,代码行数:59,代码来源:urls.py


示例15: connect

    def connect(self, url, **options):
        """
        Connect to url. url is websocket url scheme. ie. ws://host:port/resource
        You can customize using 'options'.
        If you set "header" list object, you can set your own custom header.

        >>> ws = WebSocket()
        >>> ws.connect("ws://echo.websocket.org/",
                ...     header=["User-Agent: MyProgram",
                ...             "x-custom: header"])

        timeout: socket timeout time. This value is integer.
                 if you set None for this value,
                 it means "use default_timeout value"

        options: "header" -> custom http header list.
                 "cookie" -> cookie value.
                 "http_proxy_host" - http proxy host name.
                 "http_proxy_port" - http proxy port. If not set, set to 80.

        """

        hostname, port, resource, is_secure = _parse_url(url)
        proxy_host, proxy_port = options.get("http_proxy_host", None), options.get("http_proxy_port", 0)
        if not proxy_host:
            addrinfo_list = socket.getaddrinfo(hostname, port, 0, 0, socket.SOL_TCP)
        else:
            proxy_port = proxy_port and proxy_port or 80
            addrinfo_list = socket.getaddrinfo(proxy_host, proxy_port, 0, 0, socket.SOL_TCP)

        if not addrinfo_list:
            raise WebSocketException("Host not found.: " + hostname + ":" + str(port))

        family = addrinfo_list[0][0]
        self.sock = socket.socket(family)
        self.sock.settimeout(self.timeout)
        for opts in DEFAULT_SOCKET_OPTION:
            self.sock.setsockopt(*opts)
        for opts in self.sockopt:
            self.sock.setsockopt(*opts)
        # TODO: we need to support proxy
        address = addrinfo_list[0][4]
        self.sock.connect(address)

        if proxy_host:
            self._tunnel(hostname, port)

        if is_secure:
            if HAVE_SSL:
                sslopt = dict(
                    cert_reqs=ssl.CERT_REQUIRED, ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem")
                )
                sslopt.update(self.sslopt)
                self.sock = ssl.wrap_socket(self.sock, **sslopt)
                match_hostname(self.sock.getpeercert(), hostname)
            else:
                raise WebSocketException("SSL not available.")

        self._handshake(hostname, port, resource, **options)
开发者ID:elbowz,项目名称:xbmc-pushbullet,代码行数:59,代码来源:__init__.py


示例16: connect

 def connect(self, address, timeout=5):
     sock = self.ssl_context().wrap_socket(
         socket.socket(socket.AF_INET, socket.SOCK_STREAM))
     sock.settimeout(timeout)
     sock.connect(address)
     ssl.match_hostname(sock.getpeercert(), address[0])
     self.socket = sock
     self.time_sent = time.time()
     self.time_received = self.time_sent
开发者ID:whyz,项目名称:tellive-py,代码行数:9,代码来源:tellstick.py


示例17: match_hostname

 def match_hostname(self):
     cert = self.c.sock.getpeercert()
     try:
         ssl.match_hostname(cert, self.c.host)
     except AttributeError: # old ssl module doesn't have this function
         return
     except ValueError: # empty SSL cert means underlying SSL library didn't validate it, we don't either.
         return
     except ssl.CertificateError, e:
         self.match_hostname_aws(cert, e)
开发者ID:carriercomm,项目名称:server-7,代码行数:10,代码来源:ConnMan.py


示例18: connect

 def connect(self):
     """Connect to the host and port specified in __init__."""
     sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address)
     if self._tunnel_host:
         self.sock = sock
         self._tunnel()
     if not os.path.exists(ca_certs):
         raise Exception("CA Certifcate bundle %s is not readable" % ca_certs)
     self.sock = ssl.wrap_socket(sock, ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED)
     ssl.match_hostname(self.sock.getpeercert(), self.host)
开发者ID:rameshdharan,项目名称:willie,代码行数:10,代码来源:web.py


示例19: set_ssl

    def set_ssl(self, ssl_mode, ssl_ca, ssl_crl, ssl_cert, ssl_key):
        """Set SSL parameters.

        Args:
            ssl_mode (str): SSL mode.
            ssl_ca (str): The certification authority certificate.
            ssl_crl (str): The certification revocation lists.
            ssl_cert (str): The certificate.
            ssl_key (str): The certificate key.

        Raises:
            :class:`mysqlx.RuntimeError`: If Python installation has no SSL
                                          support.
            :class:`mysqlx.InterfaceError`: If the parameters are invalid.
        """
        if not SSL_AVAILABLE:
            self.close()
            raise RuntimeError("Python installation has no SSL support")

        context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        context.load_default_certs()

        if ssl_ca:
            try:
                context.load_verify_locations(ssl_ca)
                context.verify_mode = ssl.CERT_REQUIRED
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid CA Certificate: {}".format(err))

        if ssl_crl:
            try:
                context.load_verify_locations(ssl_crl)
                context.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid CRL: {}".format(err))

        if ssl_cert:
            try:
                context.load_cert_chain(ssl_cert, ssl_key)
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid Certificate/Key: {}".format(err))

        self._socket = context.wrap_socket(self._socket)
        if ssl_mode == SSLMode.VERIFY_IDENTITY:
            try:
                hostname = socket.gethostbyaddr(self._host)
                ssl.match_hostname(self._socket.getpeercert(), hostname[0])
            except ssl.CertificateError as err:
                self.close()
                raise InterfaceError("Unable to verify server identity: {}"
                                     "".format(err))
        self._is_ssl = True
开发者ID:vtermanis,项目名称:mysql-connector-python,代码行数:55,代码来源:connection.py


示例20: _on_handshake

    def _on_handshake(self):
        try:
            self._sock.do_handshake()
        except ssl.SSLWantReadError:
            self._loop.add_reader(self._sock_fd, self._on_handshake)
            return
        except ssl.SSLWantWriteError:
            self._loop.add_writer(self._sock_fd, self._on_handshake)
            return
        except Exception as exc:
            self._loop.remove_reader(self._sock_fd)
            self._loop.remove_writer(self._sock_fd)
            self._sock.close()
            if self._waiter is not None:
                self._waiter.set_exception(exc)
            return
        except BaseException as exc:
            self._loop.remove_reader(self._sock_fd)
            self._loop.remove_writer(self._sock_fd)
            self._sock.close()
            if self._waiter is not None:
                self._waiter.set_exception(exc)
            raise

        self._loop.remove_reader(self._sock_fd)
        self._loop.remove_writer(self._sock_fd)

        peercert = self._sock.getpeercert()
        if not hasattr(self._sslcontext, 'check_hostname'):
            # Verify hostname if requested, Python 3.4+ uses check_hostname
            # and checks the hostname in do_handshake()
            if (self._server_hostname and
                self._sslcontext.verify_mode != ssl.CERT_NONE):
                try:
                    ssl.match_hostname(peercert, self._server_hostname)
                except Exception as exc:
                    self._sock.close()
                    if self._waiter is not None:
                        self._waiter.set_exception(exc)
                    return

        # Add extra info that becomes available after handshake.
        self._extra.update(peercert=peercert,
                           cipher=self._sock.cipher(),
                           compression=self._sock.compression(),
                           )

        self._read_wants_write = False
        self._write_wants_read = False
        self._loop.add_reader(self._sock_fd, self._read_ready)
        self._loop.call_soon(self._protocol.connection_made, self)
        if self._waiter is not None:
            # wait until protocol.connection_made() has been called
            self._loop.call_soon(self._waiter._set_result_unless_cancelled,
                                 None)
开发者ID:google,项目名称:cpython-pt,代码行数:55,代码来源:selector_events.py



注:本文中的ssl.match_hostname函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ssl.wrap_socket函数代码示例发布时间:2022-05-27
下一篇:
Python ssl.get_server_certificate函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap