本文整理汇总了Python中ssl.wrap_socket函数的典型用法代码示例。如果您正苦于以下问题:Python wrap_socket函数的具体用法?Python wrap_socket怎么用?Python wrap_socket使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wrap_socket函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_ssl_socket
def _get_ssl_socket(self, stream, **kwargs):
# This makes it simpler for SMTP_SSL to use the SMTP connect code
# and just alter the socket connection bit.
callback = kwargs.pop('callback')
if hasattr(self, '__get_ssl_socket'):
callback(self.__get_ssl_socket)
return
if PY3:
# due to ssl wrap_socket will detach argument sock in Python 3 and
# can't be used no longer.Also the behaviour of socket.socket.close
# is different between 2 and 3.
sock = stream.socket
iden_sock = socket.fromfd(sock.fileno(), sock.family, sock.type)
ssl_sock = ssl.wrap_socket(
iden_sock, do_handshake_on_connect=False, **kwargs)
stream.close()
else:
ssl_sock = ssl.wrap_socket(
stream.socket, do_handshake_on_connect=False, **kwargs)
stream.close()
stream = iostream.SSLIOStream(ssl_sock)
self.__get_ssl_socket = stream
callback(stream)
开发者ID:equeny,项目名称:tornadomail,代码行数:25,代码来源:smtplib.py
示例2: _openConnection
def _openConnection(self):
"""
open the (SSL) socket and return True.
# TODO: support server certificate validation, provide client cert
"""
try:
self._log("python default ssl connection failed, trying TLSv1", 2)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self._hostname, self._port))
self._socket = ssl.wrap_socket(s, self._keyfile, self._certfile, ssl_version=ssl.PROTOCOL_TLSv1, ciphers="AES256-SHA")
return True
except ssl.SSLError:
try:
s.close()
except:
pass
raise Exception("Error setting up the SSL/TLS socket to murmur.")
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self._hostname, self._port))
self._log("trying python default ssl socket", 3)
self._socket = ssl.wrap_socket(s)
return True
except ssl.SSLError:
try:
s.close()
except:
pass
开发者ID:JmactheAttack,项目名称:sftmumblebot,代码行数:28,代码来源:MumbleConnection.py
示例3: get_socket
def get_socket(self):
if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host)
if not os.path.exists(cert_path):
is_new = True
s = self.get_simple_socket()
if s is None:
return
# try with CA first
try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True)
except ssl.SSLError, e:
s = None
if s and self.check_host_name(s.getpeercert(), self.host):
self.print_error("SSL certificate signed by CA")
return s
# get server certificate.
# Do not use ssl.get_server_certificate because it does not work with proxy
s = self.get_simple_socket()
if s is None:
return
try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_NONE, ca_certs=None)
except ssl.SSLError, e:
self.print_error("SSL error retrieving SSL certificate:", e)
return
dercert = s.getpeercert(True)
s.close()
cert = ssl.DER_cert_to_PEM_cert(dercert)
# workaround android bug
cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
temporary_path = cert_path + '.temp'
with open(temporary_path,"w") as f:
f.write(cert)
开发者ID:Matoking,项目名称:electrum,代码行数:35,代码来源:interface.py
示例4: handshake
def handshake(host, port, ticket, cfg_file, thumbprint):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
expect(sock, VMAD_WELCOME)
sock = ssl.wrap_socket(sock)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
cert = sock.getpeercert(binary_form=True)
h = hashlib.sha1()
h.update(cert)
if thumbprint != h.hexdigest():
raise Exception("Server thumbprint doesn't match")
sock.write("%s %s\r\n" % (VMAD_USER_CMD, ticket))
expect(sock, VMAD_NEEDPASSWD)
sock.write("%s %s\r\n" % (VMAD_PASS_CMD, ticket))
expect(sock, VMAD_LOGINOK)
rand = os.urandom(12)
rand = base64.b64encode(rand)
sock.write("%s %s\r\n" % (VMAD_THUMB_CMD, rand))
thumbprint2 = expect(sock, VMAD_OK)
thumbprint2 = thumbprint2.replace(':', '').lower()
sock.write("%s %s mks\r\n" % (VMAD_CONNECT_CMD, cfg_file))
expect(sock, VMAD_OK)
sock2 = ssl.wrap_socket(sock)
cert2 = sock2.getpeercert(binary_form=True)
h = hashlib.sha1()
h.update(cert2)
if thumbprint2 != h.hexdigest():
raise Exception("Second thumbprint doesn't match")
sock2.write(rand)
return sock2
开发者ID:rgerganov,项目名称:mks,代码行数:30,代码来源:authd.py
示例5: connect
def connect(self):
import sys
if sys.version_info >= (2,7,0):
sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address)
else:
sock = socket.create_connection((self.host, self.port), self.timeout)
# Note these next fixes require python at least from Oct 2009 so 2.6.3
if sys.version_info >= (2,6,3):
if self._tunnel_host:
self.sock = sock
self._tunnel()
# Force use of TLSv1 with PROTOCOL_TLSv1
# Default is PROTOCOL_SSLv23 which allows either 2 or 3
# Another option is PROTOCOL_SSLv3
# We want TLS1 to avoid POODLE vulnerability. In addition, some client/server combinations fail the handshake
# if you start with SSL23 and the server wants TLS1. See geni-tools issue #745
if self.ssl_version is None:
#print "Requested a None ssl version"
self.ssl_version = ssl.PROTOCOL_TLSv1
#print "Wrapping socket to use SSL version %s" % ssl._PROTOCOL_NAMES[self.ssl_version]
if sys.version_info >= (2,7,0):
#if self.ciphers is None:
# print "Using cipherlist: 'DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'"
#else:
# print "Using cipherlist: '%s'" % self.ciphers
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=self.ssl_version, ciphers=self.ciphers)
else:
# Python 2.6 doesn't let you specify the ciphers to use
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=self.ssl_version)
开发者ID:ArthurGarnier,项目名称:geni-tools,代码行数:32,代码来源:client.py
示例6: socket_with_cert
def socket_with_cert(cert_path, key_path, cacert_path, server_cert = True):
cert_path = os.path.join(_support.DATA_DIR, cert_path)
key_path = os.path.join(_support.DATA_DIR, key_path)
cacert_path = os.path.join(_support.DATA_DIR, cacert_path)
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_sock.bind(("127.0.0.1", 0))
listen_sock.listen(1)
addr = listen_sock.getsockname()
def thread_func():
try:
sock = listen_sock.accept()[0]
sock.setblocking(True) # pylint: disable=E1101
try:
ssl.wrap_socket(sock, key_path, cert_path,
server_side = server_cert, ca_certs = cacert_path)
finally:
sock.close()
finally:
listen_sock.close()
thread = threading.Thread(target = thread_func,
name = "pyxmpp2.test.cert certificate provider thread")
thread.daemon = True
thread.start()
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.connect(addr)
if server_cert:
return ssl.wrap_socket(client_sock, cert_reqs = ssl.CERT_REQUIRED,
server_side = False, ca_certs = cacert_path)
else:
s_cert_path = os.path.join(_support.DATA_DIR, "server.pem")
s_key_path = os.path.join(_support.DATA_DIR, "server-key.pem")
return ssl.wrap_socket(client_sock, s_key_path, s_cert_path,
cert_reqs = ssl.CERT_REQUIRED, server_side = True,
ca_certs = cacert_path)
开发者ID:kulikov,项目名称:python-tools,代码行数:34,代码来源:cert.py
示例7: start
def start(self):
self.server = self.config['server']
self.port = int(self.config['port'])
if self.config['proxyhost'] is None:
if self.config['ssl'] is True:
if self.ipv6:
self.sock = wrap_socket(socket.socket(socket.AF_INET6, socket.SOCK_STREAM))
else:
self.sock = wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
else:
if self.ipv6:
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
if self.config['ssl'] is True:
temp_socket = socks.socksocket()
temp_socket.setproxy(socks.PROXY_TYPE_SOCKS4, self.config['proxyhost'], self.config['proxyport'], True)
self.sock = wrap_socket(temp_socket)
else:
self.sock = socks.socksocket()
self.sock.setproxy(socks.PROXY_TYPE_SOCKS4, self.config['proxyhost'], self.config['proxyport'], True)
self.sock.connect((self.server, self.port))
#send pass
if self.config["pass"] is not None:
self.send("PASS " + self.config["pass"], True)
self.send("USER " + self.user + " 127.0.0.1 " + self.server + " :" + self.real, True)
self.set_nick(self.nick)
self.main()
开发者ID:ldionmarcil,项目名称:ircsnapshot,代码行数:31,代码来源:ircsnapshot.py
示例8: SSL_Connect
def SSL_Connect(ctx, sock,
server_side=False, accepted=False, connected=False,
verify_names=None):
if DEBUG: DEBUG('*** TLS is provided by native Python ssl')
reqs = (verify_names and ssl.CERT_REQUIRED or ssl.CERT_NONE)
try:
fd = ssl.wrap_socket(sock, keyfile=ctx.privatekey_file,
certfile=ctx.certchain_file,
cert_reqs=reqs,
ca_certs=ctx.ca_certs,
do_handshake_on_connect=False,
ssl_version=ctx.method,
ciphers=ctx.ciphers,
server_side=server_side)
except:
fd = ssl.wrap_socket(sock, keyfile=ctx.privatekey_file,
certfile=ctx.certchain_file,
cert_reqs=reqs,
ca_certs=ctx.ca_certs,
do_handshake_on_connect=False,
ssl_version=ctx.method,
server_side=server_side)
if verify_names:
fd.do_handshake()
if not SSL_CheckPeerName(fd, verify_names):
raise SSL.Error(('Cert not in %s (%s)'
) % (verify_names, reqs))
return fd
开发者ID:SunilMohanAdapa,项目名称:PySocksipyChain,代码行数:29,代码来源:__init__.py
示例9: connect
def connect(self):
""" Connect to a host on a given (SSL) port.
Use ca_file pointing somewhere, use it to check Server Certificate.
Redefined/copied and extended from httplib.py:1105 (Python 2.6.x).
"""
host = self.host
port = self.port
sock = socket.create_connection((self.host, self.port), self.timeout)
LOG.debug(_("Connecting to Https server host %(host)s port %(port)s " % locals()))
if self._tunnel_host:
self.sock = sock
self._tunnel()
# If there's no CA File, don't force Server Certificate Check
if self.ca_file:
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
LOG.debug(_("Https server certified " % locals()))
s = self.sock
cert = s.getpeercert()
else:
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=ssl.CERT_NONE)
开发者ID:aforalee,项目名称:opencit,代码行数:26,代码来源:OpenstackClient.py
示例10: test_for_leaks
def test_for_leaks():
# Let's test for leaks first
p_server = compile_and_invoke( "t2_tls_leaks.hs" )
ld = LeakDetector(p_server.pid)
sockets = []
time.sleep(1.0)
for i in range(200):
conn = wrap_socket( socket.create_connection(("127.0.0.1", 8090)) )
sockets.append(conn)
repeats = 12
exercises = 50
for i in range(repeats):
ld.sample()
nsockets = []
for i in range(exercises):
conn = wrap_socket( socket.create_connection(("127.0.0.1", 8090)) )
nsockets.append(conn)
for ns in nsockets:
ns.close()
n = repeats * exercises
re = rconnect()
ensure(re.counter("socket-close-called")>1, "not even a socket-close called")
ensure(re.counter("socket-close-called")>= n, "not enough closed sockets")
ensure(re.counter("socket-shutdown-executed")>= n, "not enough sockets shutdown")
ensure(re.counter("socket-close-executed")>= n, "not enough sockets closed")
ensure(re.counter("stable-pointer-freed")>1, "we are leaking stable pointers")
ensure(not ld.leaks(sample_size=repeats//2), "It's leaking memory!")
os.kill(-p_server.pid, signal.SIGTERM)
开发者ID:shimmercat,项目名称:second-transfer,代码行数:32,代码来源:t2_tls_memory_usage.py
示例11: _http_connect
def _http_connect(hostname, port, use_ssl):
p_url = urlparse.urlparse(conf.proxy)
p_hostname = p_url.hostname
p_port = p_url.port
p_use_ssl = True if p_url.scheme[-1] == 's' else False
try:
sock = socket.create_connection((p_hostname, p_port))
except socket.error:
raise ProxyError("Unable to connect to the proxy")
if p_use_ssl:
try:
# No check is made to verify proxy certificate
sock = ssl.wrap_socket(sock, ssl_version=conf._ssl_version)
except socket.error:
raise ProxyError("Unable to use SSL with the proxy")
if use_ssl:
f = sock.makefile("rwb", 0)
f.write("CONNECT {}:{} HTTP/1.1\r\n\r\n".format(hostname, port))
try:
v, s, m = read_banner(f)
except ValueError:
raise BadStatusLine()
if s != "200":
raise ProxyError("Bad status " + s + " " + m)
_ = read_headers(f)
sock = ssl.wrap_socket(sock, ssl_version=conf._ssl_version)
return sock
开发者ID:molotof,项目名称:abrupt,代码行数:27,代码来源:http.py
示例12: do_connect
def do_connect(host, port, use_ssl, timeout=None):
# do IPv4
if not is_v6(host):
if use_ssl:
s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
#defaults to PROTOCOL_v23 in client mode. ciphers will be negotiated based on handshake from server
sock = ssl.wrap_socket( s )
#sock.setblocking(False)
else:
sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
#fcntl.fcntl(sock, fcntl.F_SETFL, os.O_NONBLOCK)
# do IPv6
else:
if use_ssl:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock = ssl.wrap_socket( s )
#sock.setblocking(False)
else:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
#fcntl.fcntl(sock, fcntl.F_SETFL, os.O_NONBLOCK)
# don't block
sock.setblocking(False)
sock.settimeout(timeout)
# light this candle
sock.connect((host,port))
return sock
开发者ID:extendedstackpointer,项目名称:ratbox-monitor,代码行数:32,代码来源:connect.py
示例13: wrap_socket
def wrap_socket(sock, verify_cert, ca_certs):
cafiles = []
cafiles = [
'/etc/ssl/certs/ca-certificates.crt', # ubuntu/debian
'/etc/pki/tls/certs/ca-bundle.crt', # redhat/centos
'/etc/ssl/cert.pem', # openbsd
]
if not verify_cert:
return ssl.wrap_socket(sock)
if not ca_certs:
for f in cafiles:
if os.path.isfile(f):
ca_certs = f
continue
if ca_certs is not None:
return ssl.wrap_socket(sock,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=ca_certs)
tempdir = tempfile.mkdtemp()
try:
filename = os.path.join(tempdir, "cert")
temp = open(filename, "w+b")
try:
temp.write(WRAP_CERT)
finally:
temp.close()
return ssl.wrap_socket(sock,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=filename)
finally:
shutil.rmtree(tempdir)
开发者ID:BinaryFu,项目名称:opencollab,代码行数:33,代码来源:_sslwrapper.py
示例14: urfa_login
def urfa_login(self):
"""
handshake and login
"""
self.sck.connect((self.addr, self.port))
self.pck.recv(self.sck)
session_id = self.pck.attr[U_CODE_ATTR_SID]['data']
auth_hash_digest = hashlib.new('md5')
auth_hash_digest.update(session_id)
auth_hash_digest.update(self.passwd)
auth_hash = auth_hash_digest.digest()
self.pck.init(code=U_PKT_REQ)
if self.admin:
login_type = U_LGN_SYS
ssl_type = U_SSLT_RSACRT
else:
login_type = U_LGN_USR
ssl_type = U_SSLT_SSL3
self.pck.add_attr(U_CODE_ATTR_LGN_T, login_type, U_TP_I)
self.pck.add_attr(U_CODE_ATTR_LGN, self.login, U_TP_S)
self.pck.add_attr(U_CODE_ATTR_DGS, session_id, U_TP_S)
self.pck.add_attr(U_CODE_ATTR_HSH, auth_hash, U_TP_S)
self.pck.add_attr(U_CODE_ATTR_SSL, ssl_type, U_TP_I)
self.pck.send(self.sck)
self.pck.recv(self.sck)
if self.pck.code == U_PKT_ACCEPT:
if self.admin:
self.sck = ssl.wrap_socket(self.sck, certfile=self.crt_file, ssl_version=ssl.PROTOCOL_SSLv3)
else:
self.sck = ssl.wrap_socket(self.sck, ciphers='ADH-RC4-MD5', ssl_version=ssl.PROTOCOL_SSLv3)
开发者ID:snkhokh,项目名称:pyurfa,代码行数:30,代码来源:urfa_connection.py
示例15: connect
def connect(self):
"""
Connect to a host on a given port and check the certificate
This is almost the same than the conect of HTTPSConnection, but adds
some function for SSL certificate verification
"""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if self.ca_file:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
ca_certs = self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
else:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_NONE)
self.certificate = self.sock.getpeercert(True)
if self.callBack:
if not self.callBack(self.certificate):
raise ssl.SSLError(1, "Call back verification failed")
开发者ID:blckshrk,项目名称:Weboob,代码行数:28,代码来源:hellhttp.py
示例16: build_sockets
def build_sockets(self):
self.debug.put('Socket Builder started.')
count = 0
while (self.options['attacklimit'] == 0 or self.options['attacklimit'] > self.attacks) and self.running:
if self.options['connectionlimit'] > self.sockets:
if self.options['socksversion'] == 'SOCKS4' or self.options['socksversion'] == 'SOCKS5' or self.options['socksversion'] == 'HTTP':
if self.options['socksversion'] == 'SOCKS4': proxytype = socks.PROXY_TYPE_SOCKS4
elif self.options['socksversion'] == 'SOCKS5': proxytype = socks.PROXY_TYPE_SOCKS5
else: proxytype = socks.PROXY_TYPE_HTTP
s = socks.socksocket()
if self.options['socksuser'] == '' and self.options['sockspass'] == '':
s.setproxy(proxytype, self.options['sockshost'], self.options['socksport'], self.options['socksuser'], self.options['sockspass'])
else:
s.setproxy(proxytype, self.options['sockshost'], self.options['socksport'])
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((self.options['host'], self.options['port']))
if self.options['ssl'] == True:
wrap_socket(s)
self.connections.put((s, 0))
self.debug.put('Socket opened, connection created.')
self.attacks += 1
self.sockets += 1
except Exception, ex:
self.errors.put('Could not connect. %s.' % (ex))
if self.options['timebetweenconnections'] > 0:
time.sleep(self.options['timebetweenconnections'])
开发者ID:lelou6666,项目名称:opparis,代码行数:30,代码来源:libloris.py
示例17: connect
def connect(self):
try:
sock = socket.create_connection(
(self.host, self.port),
self.timeout
)
except AttributeError:
# Python 2.4 compatibility (does not deal with IPv6)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
try:
if self._tunnel_host:
self.sock = sock
self._tunnel()
except AttributeError:
# Python 2.4 compatibility (_tunnel_host not defined)
pass
try:
self.sock = ssl.wrap_socket(
sock, self.key_file, self.cert_file,
ssl_version=ssl.PROTOCOL_TLSv1
)
except ssl.SSLError:
self.sock = ssl.wrap_socket(
sock, self.key_file, self.cert_file,
ssl_version=ssl.PROTOCOL_SSLv23
)
开发者ID:crepererum,项目名称:invenio,代码行数:29,代码来源:datacite.py
示例18: connect
def connect(self, host, port):
self.dispatcher = RabbitDispatcher(self)
self.dispatcher.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# Wrap the SSL socket if we SSL turned on
if self.parameters.ssl:
self.dispatcher.socket.setblocking(1)
if self.parameters.ssl_options:
self.dispatcher.socket = ssl.wrap_socket(self.dispatcher.socket,
**self.parameters.ssl_options)
else:
self.dispatcher.socket = ssl.wrap_socket(self.dispatcher.socket)
# Fix 2.7.1+ SSL socket bug
# If Python version is 2.7.1, we should connect it first,
# then everything works OK
# if we use Python 2.7.2, and we connect it first, then we will get a
# double connect exception.
# so we only connect the socket when Python version is 2.7.1
if platform.python_version().startswith("2.7.1"):
self.dispatcher.socket.connect((host, port or spec.PORT))
# Set the socket to non-blocking
if not self.parameters.ssl:
self.dispatcher.socket.setblocking(0)
self.dispatcher.connect((host, port or spec.PORT))
开发者ID:linglan520,项目名称:py-servicebus,代码行数:25,代码来源:asyncore_adapter.py
示例19: start
def start(self):
"""
Creates a SSL connection to the iDigi Server and sends a
ConnectionRequest message.
"""
self.log.info("Starting SSL Session for Monitor %s."
% self.monitor_id)
if self.socket is not None:
raise Exception("Socket already established for %s." % self)
try:
# Create socket, wrap in SSL and connect.
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Validate that certificate server uses matches what we expect.
if self.ca_certs is not None:
self.socket = ssl.wrap_socket(self.socket,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.ca_certs)
else:
self.socket = ssl.wrap_socket(self.socket)
self.socket.connect((self.client.hostname, PUSH_SECURE_PORT))
self.socket.setblocking(0)
except Exception as exception:
self.socket.close()
self.socket = None
raise exception
self.send_connection_request()
开发者ID:digidotcom,项目名称:python-devicecloud,代码行数:29,代码来源:monitor_tcp.py
示例20: connect
def connect(self):
# If the ssl library is not available, then we just have to fall back on old HTTPSConnection.connect
# method. There are too many dependencies to implement it directly here.
if not self.__has_ssl:
# Unfortunately, the only way to set timeout is to temporarily set the global default timeout
# for what it should be for this connection, and then reset it once the connection is established.
# Messy, but not much we can do.
old_timeout = None
try:
old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(self.__timeout)
httplib.HTTPSConnection.connect(self)
return
finally:
socket.setdefaulttimeout(old_timeout)
# Create the underlying socket. Prefer Python's newer socket.create_connection method if it is available.
if hasattr(socket, 'create_connection'):
self.sock = socket.create_connection((self.host, self.port), self.__timeout)
else:
self.sock = create_connection_helper(self.host, self.port, timeout=self.__timeout)
if self._tunnel_host:
self._tunnel()
# Now ask the ssl library to wrap the socket and verify the server certificate if we have a ca_file.
if self.__ca_file is not None:
self.sock = ssl.wrap_socket(self.sock, ca_certs=self.__ca_file, cert_reqs=ssl.CERT_REQUIRED)
else:
self.sock = ssl.wrap_socket(self.sock, cert_reqs=ssl.CERT_NONE)
开发者ID:imron,项目名称:scalyr-agent-2,代码行数:30,代码来源:connection.py
注:本文中的ssl.wrap_socket函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论