本文整理汇总了Python中ssl.match_hostname函数的典型用法代码示例。如果您正苦于以下问题:Python match_hostname函数的具体用法?Python match_hostname怎么用?Python match_hostname使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了match_hostname函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: connect
def connect(self):
"""Connect to Mongo and return a new (connected) socket. Note that the
pool does not keep a reference to the socket -- you must call
return_socket() when you're done with it.
"""
sock = self.create_connection()
hostname = self.pair[0]
if self.use_ssl:
try:
sock = ssl.wrap_socket(sock,
certfile=self.ssl_certfile,
keyfile=self.ssl_keyfile,
ca_certs=self.ssl_ca_certs,
cert_reqs=self.ssl_cert_reqs)
if self.ssl_cert_reqs:
match_hostname(sock.getpeercert(), hostname)
except ssl.SSLError:
sock.close()
raise ConnectionFailure("SSL handshake failed. MongoDB may "
"not be configured with SSL support.")
sock.settimeout(self.net_timeout)
return SocketInfo(sock, self.pool_id, hostname)
开发者ID:iamtonyarmstrong,项目名称:flasky,代码行数:25,代码来源:pool.py
示例2: _connect_socket
def _connect_socket(self):
sockerr = None
inet_address, port = self.endpoint.resolve()
addresses = socket.getaddrinfo(inet_address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
if not addresses:
raise ConnectionException("getaddrinfo returned empty list for %s" % (self.endpoint,))
for (af, socktype, proto, canonname, sockaddr) in addresses:
try:
self._socket = self._socket_impl.socket(af, socktype, proto)
if self.ssl_context:
self._socket = self.ssl_context.wrap_socket(self._socket,
**(self.ssl_options or {}))
elif self.ssl_options:
if not self._ssl_impl:
raise RuntimeError("This version of Python was not compiled with SSL support")
self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
self._socket.settimeout(self.connect_timeout)
self._socket.connect(sockaddr)
self._socket.settimeout(None)
if self._check_hostname:
ssl.match_hostname(self._socket.getpeercert(), self.endpoint.address)
sockerr = None
break
except socket.error as err:
if self._socket:
self._socket.close()
self._socket = None
sockerr = err
if sockerr:
raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))
if self.sockopts:
for args in self.sockopts:
self._socket.setsockopt(*args)
开发者ID:datastax,项目名称:python-driver,代码行数:35,代码来源:connection.py
示例3: __init__
def __init__(self, host='127.0.0.1', port=6667, usessl=False, nick="Robot", user="Robot", longuser="I am a robot!", chans={"#yolo"}):
if usessl: # Set up an SSL socket
try:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
except:
print("ERROR: Can't create an SSL context. You're probably trying to connect using SSL under Python 2. You need to use Python 3.")
raise
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations("/etc/ssl/cert.pem") # May be different for different systems
context.set_ciphers("DHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-SHA256:DHE-RSA-AES128-SHA256") # Only the good ones
self.s = context.wrap_socket(socket.socket(socket.AF_INET))
self.s.connect((host,port))
cert = self.s.getpeercert()
ssl.match_hostname(cert,host)
print("Successfully connected using cipher %s. Huzzah!"%self.s.cipher()[0])
else: # Set up a normal socket
self.s = socket.socket()
self.s.connect((host,port))
self.f = self.s.makefile()
self.write = lambda x : self.s.send(x.encode('UTF-8') + crlf)
self.read = self.f.readline
self.nick = nick
self.write('NICK ' + nick)
self.write('USER ' + user + ' * * :' + longuser)
if chans:
for chan in chans:
self.write('JOIN ' + chan)
self.chans = set(chans)
开发者ID:asv-github,项目名称:bp4u-irc-bots,代码行数:31,代码来源:bot.py
示例4: _configured_socket
def _configured_socket(address, options):
"""Given (host, port) and PoolOptions, return a configured socket.
Can raise socket.error, ConnectionFailure, or CertificateError.
Sets socket's SSL and timeout options.
"""
sock = _create_connection(address, options)
ssl_context = options.ssl_context
if ssl_context is not None:
try:
sock = ssl_context.wrap_socket(sock)
except IOError as exc:
sock.close()
raise ConnectionFailure("SSL handshake failed: %s" % (str(exc),))
if ssl_context.verify_mode and options.ssl_match_hostname:
try:
match_hostname(sock.getpeercert(), hostname=address[0])
except CertificateError:
sock.close()
raise
sock.settimeout(options.socket_timeout)
return sock
开发者ID:Alpus,项目名称:Eth,代码行数:25,代码来源:pool.py
示例5: connect
def connect(self, host=None, port=None, address_family=None):
"""
Method that initiates a connection to an EPP host
"""
host = host or self.host
self.sock = socket.socket(address_family or socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.socket_connect_timeout) # connect timeout
self.sock.connect((host, port or self.port))
local_sock_addr = self.sock.getsockname()
local_addr, local_port = local_sock_addr[:2]
self.log.debug('connected local=%s:%s remote=%s:%s',
local_addr, local_port, self.sock.getpeername()[0], port)
self.sock.settimeout(self.socket_timeout) # regular timeout
if self.ssl_enable:
self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile,
ssl_version=self.ssl_version,
ciphers=self.ssl_ciphers,
server_side=False,
cert_reqs=self.cert_required,
ca_certs=self.cacerts)
self.log.debug('%s negotiated with local=%s:%s remote=%s:%s', self.sock.version(),
local_addr, local_port, self.sock.getpeername()[0], port)
if self.validate_hostname:
try:
match_hostname(self.sock.getpeercert(), host)
except CertificateError as exp:
self.log.exception("SSL hostname mismatch")
raise EppConnectionError(str(exp))
self.greeting = EppResponse.from_xml(self.read().decode('utf-8'))
开发者ID:cloudregistry,项目名称:eppy,代码行数:29,代码来源:client.py
示例6: _verify_cert
def _verify_cert(self, sock: ssl.SSLSocket):
'''Check if certificate matches hostname.'''
# Based on tornado.iostream.SSLIOStream
# Needed for older OpenSSL (<0.9.8f) versions
verify_mode = self._ssl_context.verify_mode
assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED,
ssl.CERT_OPTIONAL), \
'Unknown verify mode {}'.format(verify_mode)
if verify_mode == ssl.CERT_NONE:
return
cert = sock.getpeercert()
if not cert and verify_mode == ssl.CERT_OPTIONAL:
return
if not cert:
raise SSLVerificationError('No SSL certificate given')
try:
ssl.match_hostname(cert, self._hostname)
except ssl.CertificateError as error:
raise SSLVerificationError('Invalid SSL certificate') from error
开发者ID:chfoo,项目名称:wpull,代码行数:25,代码来源:connection.py
示例7: check_hostname
def check_hostname(sock, server_name, additional_names):
server_certificate = sock.getpeercert()
if log_enabled(NETWORK):
log(NETWORK, 'certificate found for %s: %s', sock, server_certificate)
if additional_names:
host_names = [server_name] + (additional_names if isinstance(additional_names, SEQUENCE_TYPES) else [additional_names])
else:
host_names = [server_name]
for host_name in host_names:
if not host_name:
continue
elif host_name == '*':
if log_enabled(NETWORK):
log(NETWORK, 'certificate matches * wildcard')
return # valid
try:
match_hostname(server_certificate, host_name) # raise CertificateError if certificate doesn't match server name
if log_enabled(NETWORK):
log(NETWORK, 'certificate matches host name <%s>', host_name)
return # valid
except CertificateError as e:
if log_enabled(NETWORK):
log(NETWORK, str(e))
if log_enabled(ERROR):
log(ERROR, "hostname doesn't match certificate")
raise LDAPCertificateError("certificate %s doesn't match any name in %s " % (server_certificate, str(host_names)))
开发者ID:cannatag,项目名称:ldap3,代码行数:29,代码来源:tls.py
示例8: connect
def connect(self):
sock = socket.create_connection(
(self.host, self.port), getattr(self, 'source_address', None)
)
# Handle the socket if a (proxy) tunnel is present
if hasattr(self, '_tunnel') and getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
# http://bugs.python.org/issue7776: Python>=3.4.1 and >=2.7.7
# change self.host to mean the proxy server host when tunneling is
# being used. Adapt, since we are interested in the destination
# host for the match_hostname() comparison.
actual_host = self._tunnel_host
else:
actual_host = self.host
self.sock = ssl.wrap_socket(
sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_bundle
)
try:
match_hostname(self.sock.getpeercert(), actual_host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
开发者ID:HiroIshikawa,项目名称:21playground,代码行数:26,代码来源:ssl_support.py
示例9: connect
def connect(self):
self.connection_kwargs = {}
# for > py2.5
if hasattr(self, 'timeout'):
self.connection_kwargs.update(timeout = self.timeout)
# for >= py2.7
if hasattr(self, 'source_address'):
self.connection_kwargs.update(source_address = self.source_address)
sock = socket.create_connection((self.host, self.port), **self.connection_kwargs)
# for >= py2.7
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
# set location of certificate authorities
assert os.path.isfile(tuf.conf.ssl_certificates)
cert_path = tuf.conf.ssl_certificates
# TODO: Disallow SSLv2.
# http://docs.python.org/dev/library/ssl.html#protocol-versions
# TODO: Select the right ciphers.
# http://docs.python.org/dev/library/ssl.html#cipher-selection
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=cert_path)
match_hostname(self.sock.getpeercert(), self.host)
开发者ID:1-Hash,项目名称:tuf,代码行数:32,代码来源:download.py
示例10: create_connection
def create_connection(server=None, port=None, timeout=None, use_ssl=False):
"""Create a socket or a secure ssl socket.
Hopefully some day such functionality will be included in python3.
:returns socket
:rtype socket.SocketType
"""
# based on socket.create_connection()
if use_ssl:
context = ssl_create_default_context()
err = None
for af, socktype, proto, canonname, sa in \
socket.getaddrinfo(server, port, 0, socket.SOCK_STREAM):
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not None:
sock.settimeout(timeout)
if use_ssl:
sock = context.wrap_socket(sock, server_hostname=server)
sock.connect(sa)
if use_ssl:
match_hostname(sock.getpeercert(), server)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")
开发者ID:tochev,项目名称:python3-pcloudapi,代码行数:35,代码来源:utils.py
示例11: connect
def connect(self, url, **options):
"""
Connect to url. url is websocket url scheme. ie. ws://host:port/resource
You can customize using 'options'.
If you set "header" dict object, you can set your own custom header.
>>> ws = WebSocket()
>>> ws.connect("ws://echo.websocket.org/",
... header={"User-Agent: MyProgram",
... "x-custom: header"})
timeout: socket timeout time. This value is integer.
if you set None for this value,
it means "use default_timeout value"
options: current support option is only "header".
if you set header as dict value,
the custom HTTP headers are added.
"""
hostname, port, resource, is_secure = _parse_url(url)
# TODO: we need to support proxy
self.sock.connect((hostname, port))
if is_secure:
if HAVE_SSL:
sslopt = dict(cert_reqs=ssl.CERT_REQUIRED,
ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem"))
sslopt.update(self.sslopt)
self.sock = ssl.wrap_socket(self.sock, **sslopt)
match_hostname(self.sock.getpeercert(), hostname)
else:
raise WebSocketException("SSL not available.")
self._handshake(hostname, port, resource, **options)
开发者ID:fvieira,项目名称:SeeMeCode-sublimetext3,代码行数:34,代码来源:__init__.py
示例12: _create_ssl_connection
def _create_ssl_connection(self, sock):
self._logger.debug("Creating ssl connection...")
ssl_protocol_version = ssl.PROTOCOL_SSLv23
if self._port == 443:
ssl_context = SSLContextBuilder()\
.with_ca_certs(self._ca_path)\
.with_cert_key_pair(self._cert_path, self._key_path)\
.with_cert_reqs(ssl.CERT_REQUIRED)\
.with_check_hostname(True)\
.with_ciphers(None)\
.with_alpn_protocols(['x-amzn-http-ca'])\
.build()
ssl_sock = ssl_context.wrap_socket(sock, server_hostname=self._host, do_handshake_on_connect=False)
ssl_sock.do_handshake()
else:
ssl_sock = ssl.wrap_socket(sock,
certfile=self._cert_path,
keyfile=self._key_path,
ca_certs=self._ca_path,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl_protocol_version)
self._logger.debug("Matching host name...")
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
self._tls_match_hostname(ssl_sock)
else:
ssl.match_hostname(ssl_sock.getpeercert(), self._host)
return ssl_sock
开发者ID:aws,项目名称:aws-iot-device-sdk-python,代码行数:31,代码来源:providers.py
示例13: connect
def connect(self):
if self._tunnel_host:
server_hostname = self._tunnel_host
else:
server_hostname = self.host
sni_hostname = server_hostname if ssl.HAS_SNI else None # will be useful eventually
ns = yield from self._create_connection((self.host, self.port), self.timeout,
self.source_address, ssl=self._context,
server_hostname=server_hostname)
self.notSock = ns
if self._tunnel_host:
yield from self._tunnel()
self.auto_open = 0
# self.sock = self._context.wrap_socket(self.sock, server_hostname=sni_hostname,
# do_handshake_on_connect=False)
if not self._context.check_hostname and self._check_hostname:
try:
#sock = self.notSock.socket()
ssl.match_hostname(self.notSock.peercert(), server_hostname)
except Exception as e:
self.close()
raise
开发者ID:rdbhost,项目名称:yieldfromUrllib3,代码行数:27,代码来源:connection.py
示例14: http_request
def http_request(self, req):
tmp_ca_cert_path, paths_checked = self.get_ca_certs()
https_proxy = os.environ.get('https_proxy')
context = None
if HAS_SSLCONTEXT:
context = self._make_context(tmp_ca_cert_path)
# Detect if 'no_proxy' environment variable is set and if our URL is included
use_proxy = self.detect_no_proxy(req.get_full_url())
if not use_proxy:
# ignore proxy settings for this host request
return req
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if https_proxy:
proxy_parts = generic_urlparse(urlparse.urlparse(https_proxy))
port = proxy_parts.get('port') or 443
s.connect((proxy_parts.get('hostname'), port))
if proxy_parts.get('scheme') == 'http':
s.sendall(self.CONNECT_COMMAND % (self.hostname, self.port))
if proxy_parts.get('username'):
credentials = "%s:%s" % (proxy_parts.get('username',''), proxy_parts.get('password',''))
s.sendall('Proxy-Authorization: Basic %s\r\n' % credentials.encode('base64').strip())
s.sendall('\r\n')
connect_result = s.recv(4096)
self.validate_proxy_response(connect_result)
if context:
ssl_s = context.wrap_socket(s, server_hostname=self.hostname)
else:
ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED, ssl_version=PROTOCOL)
match_hostname(ssl_s.getpeercert(), self.hostname)
else:
raise ProxyError('Unsupported proxy scheme: %s. Currently ansible only supports HTTP proxies.' % proxy_parts.get('scheme'))
else:
s.connect((self.hostname, self.port))
if context:
ssl_s = context.wrap_socket(s, server_hostname=self.hostname)
else:
ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED, ssl_version=PROTOCOL)
match_hostname(ssl_s.getpeercert(), self.hostname)
# close the ssl connection
#ssl_s.unwrap()
s.close()
except (ssl.SSLError, socket.error), e:
# fail if we tried all of the certs but none worked
if 'connection refused' in str(e).lower():
raise ConnectionError('Failed to connect to %s:%s.' % (self.hostname, self.port))
else:
raise SSLValidationError('Failed to validate the SSL certificate for %s:%s.'
' Make sure your managed systems have a valid CA'
' certificate installed. If the website serving the url'
' uses SNI you need python >= 2.7.9 on your managed'
' machine. You can use validate_certs=False if you do'
' not need to confirm the server\s identity but this is'
' unsafe and not recommended'
' Paths checked for this platform: %s' % (self.hostname, self.port, ", ".join(paths_checked))
)
开发者ID:RajeevNambiar,项目名称:temp,代码行数:59,代码来源:urls.py
示例15: connect
def connect(self, url, **options):
"""
Connect to url. url is websocket url scheme. ie. ws://host:port/resource
You can customize using 'options'.
If you set "header" list object, you can set your own custom header.
>>> ws = WebSocket()
>>> ws.connect("ws://echo.websocket.org/",
... header=["User-Agent: MyProgram",
... "x-custom: header"])
timeout: socket timeout time. This value is integer.
if you set None for this value,
it means "use default_timeout value"
options: "header" -> custom http header list.
"cookie" -> cookie value.
"http_proxy_host" - http proxy host name.
"http_proxy_port" - http proxy port. If not set, set to 80.
"""
hostname, port, resource, is_secure = _parse_url(url)
proxy_host, proxy_port = options.get("http_proxy_host", None), options.get("http_proxy_port", 0)
if not proxy_host:
addrinfo_list = socket.getaddrinfo(hostname, port, 0, 0, socket.SOL_TCP)
else:
proxy_port = proxy_port and proxy_port or 80
addrinfo_list = socket.getaddrinfo(proxy_host, proxy_port, 0, 0, socket.SOL_TCP)
if not addrinfo_list:
raise WebSocketException("Host not found.: " + hostname + ":" + str(port))
family = addrinfo_list[0][0]
self.sock = socket.socket(family)
self.sock.settimeout(self.timeout)
for opts in DEFAULT_SOCKET_OPTION:
self.sock.setsockopt(*opts)
for opts in self.sockopt:
self.sock.setsockopt(*opts)
# TODO: we need to support proxy
address = addrinfo_list[0][4]
self.sock.connect(address)
if proxy_host:
self._tunnel(hostname, port)
if is_secure:
if HAVE_SSL:
sslopt = dict(
cert_reqs=ssl.CERT_REQUIRED, ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem")
)
sslopt.update(self.sslopt)
self.sock = ssl.wrap_socket(self.sock, **sslopt)
match_hostname(self.sock.getpeercert(), hostname)
else:
raise WebSocketException("SSL not available.")
self._handshake(hostname, port, resource, **options)
开发者ID:elbowz,项目名称:xbmc-pushbullet,代码行数:59,代码来源:__init__.py
示例16: connect
def connect(self, address, timeout=5):
sock = self.ssl_context().wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_STREAM))
sock.settimeout(timeout)
sock.connect(address)
ssl.match_hostname(sock.getpeercert(), address[0])
self.socket = sock
self.time_sent = time.time()
self.time_received = self.time_sent
开发者ID:whyz,项目名称:tellive-py,代码行数:9,代码来源:tellstick.py
示例17: match_hostname
def match_hostname(self):
cert = self.c.sock.getpeercert()
try:
ssl.match_hostname(cert, self.c.host)
except AttributeError: # old ssl module doesn't have this function
return
except ValueError: # empty SSL cert means underlying SSL library didn't validate it, we don't either.
return
except ssl.CertificateError, e:
self.match_hostname_aws(cert, e)
开发者ID:carriercomm,项目名称:server-7,代码行数:10,代码来源:ConnMan.py
示例18: connect
def connect(self):
"""Connect to the host and port specified in __init__."""
sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if not os.path.exists(ca_certs):
raise Exception("CA Certifcate bundle %s is not readable" % ca_certs)
self.sock = ssl.wrap_socket(sock, ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED)
ssl.match_hostname(self.sock.getpeercert(), self.host)
开发者ID:rameshdharan,项目名称:willie,代码行数:10,代码来源:web.py
示例19: set_ssl
def set_ssl(self, ssl_mode, ssl_ca, ssl_crl, ssl_cert, ssl_key):
"""Set SSL parameters.
Args:
ssl_mode (str): SSL mode.
ssl_ca (str): The certification authority certificate.
ssl_crl (str): The certification revocation lists.
ssl_cert (str): The certificate.
ssl_key (str): The certificate key.
Raises:
:class:`mysqlx.RuntimeError`: If Python installation has no SSL
support.
:class:`mysqlx.InterfaceError`: If the parameters are invalid.
"""
if not SSL_AVAILABLE:
self.close()
raise RuntimeError("Python installation has no SSL support")
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_default_certs()
if ssl_ca:
try:
context.load_verify_locations(ssl_ca)
context.verify_mode = ssl.CERT_REQUIRED
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid CA Certificate: {}".format(err))
if ssl_crl:
try:
context.load_verify_locations(ssl_crl)
context.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid CRL: {}".format(err))
if ssl_cert:
try:
context.load_cert_chain(ssl_cert, ssl_key)
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid Certificate/Key: {}".format(err))
self._socket = context.wrap_socket(self._socket)
if ssl_mode == SSLMode.VERIFY_IDENTITY:
try:
hostname = socket.gethostbyaddr(self._host)
ssl.match_hostname(self._socket.getpeercert(), hostname[0])
except ssl.CertificateError as err:
self.close()
raise InterfaceError("Unable to verify server identity: {}"
"".format(err))
self._is_ssl = True
开发者ID:vtermanis,项目名称:mysql-connector-python,代码行数:55,代码来源:connection.py
示例20: _on_handshake
def _on_handshake(self):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
self._loop.add_reader(self._sock_fd, self._on_handshake)
return
except ssl.SSLWantWriteError:
self._loop.add_writer(self._sock_fd, self._on_handshake)
return
except Exception as exc:
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
return
except BaseException as exc:
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
raise
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
# Verify hostname if requested, Python 3.4+ uses check_hostname
# and checks the hostname in do_handshake()
if (self._server_hostname and
self._sslcontext.verify_mode != ssl.CERT_NONE):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
return
# Add extra info that becomes available after handshake.
self._extra.update(peercert=peercert,
cipher=self._sock.cipher(),
compression=self._sock.compression(),
)
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
if self._waiter is not None:
# wait until protocol.connection_made() has been called
self._loop.call_soon(self._waiter._set_result_unless_cancelled,
None)
开发者ID:google,项目名称:cpython-pt,代码行数:55,代码来源:selector_events.py
注:本文中的ssl.match_hostname函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论