本文整理汇总了Python中ssl.SSLContext类的典型用法代码示例。如果您正苦于以下问题:Python SSLContext类的具体用法?Python SSLContext怎么用?Python SSLContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SSLContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: ssl_wrap_socket
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
ca_certs=None, server_hostname=None,
ssl_version=None, ciphers=None):
"""
All arguments except `server_hostname` have the same meaning as for
:func:`ssl.wrap_socket`
:param server_hostname:
Hostname of the expected certificate
"""
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
# Disable TLS compression to migitate CRIME attack (issue #309)
OP_NO_COMPRESSION = 0x20000
context.options |= OP_NO_COMPRESSION
if ca_certs:
try:
context.load_verify_locations(ca_certs)
# Py32 raises IOError
# Py33 raises FileNotFoundError
except Exception as e: # Reraise as SSLError
raise SSLError(e)
if certfile:
# FIXME: This block needs a test.
context.load_cert_chain(certfile, keyfile)
if ciphers:
context.set_ciphers(ciphers)
if HAS_SNI: # Platform-specific: OpenSSL with enabled SNI
return context.wrap_socket(sock, server_hostname=server_hostname)
return context.wrap_socket(sock)
开发者ID:Perkville,项目名称:requests,代码行数:32,代码来源:ssl_.py
示例2: create_urllib3_context
def create_urllib3_context(ssl_version=None, cert_reqs=None,
options=None, ciphers=None):
"""All arguments have the same meaning as ``ssl_wrap_socket``.
By default, this function does a lot of the same work that
``ssl.create_default_context`` does on Python 3.4+. It:
- Disables SSLv2, SSLv3, and compression
- Sets a restricted set of server ciphers
If you wish to enable SSLv3, you can do::
from urllib3.util import ssl_
context = ssl_.create_urllib3_context()
context.options &= ~ssl_.OP_NO_SSLv3
You can do the same to enable compression (substituting ``COMPRESSION``
for ``SSLv3`` in the last line above).
:param ssl_version:
The desired protocol version to use. This will default to
PROTOCOL_SSLv23 which will negotiate the highest protocol that both
the server and your installation of OpenSSL support.
:param cert_reqs:
Whether to require the certificate verification. This defaults to
``ssl.CERT_REQUIRED``.
:param options:
Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``,
``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``.
:param ciphers:
Which cipher suites to allow the server to select.
:returns:
Constructed SSLContext object with specified options
:rtype: SSLContext
"""
context = SSLContext(ssl_version or ssl.PROTOCOL_SSLv23)
context.set_ciphers(ciphers or DEFAULT_CIPHERS)
# Setting the default here, as we may have no ssl module on import
cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
if options is None:
options = 0
# SSLv2 is easily broken and is considered harmful and dangerous
options |= OP_NO_SSLv2
# SSLv3 has several problems and is now dangerous
options |= OP_NO_SSLv3
# Disable compression to prevent CRIME attacks for OpenSSL 1.0+
# (issue #309)
options |= OP_NO_COMPRESSION
context.options |= options
context.verify_mode = cert_reqs
if getattr(context, 'check_hostname', None) is not None: # Platform-specific: Python 3.2
# We do our own verification, including fingerprints and alternative
# hostnames. So disable it here
context.check_hostname = False
return context
开发者ID:Ddoudou,项目名称:test,代码行数:60,代码来源:ssl_.py
示例3: factory
def factory(uri, ssl=False, **init_args):
from urllib.parse import urlparse, unquote, parse_qs
o = urlparse(uri)
srv = None
if o.scheme == "irc" or o.scheme == "ircs":
# https://www.w3.org/Addressing/draft-mirashi-url-irc-01.txt
# https://www-archive.mozilla.org/projects/rt-messaging/chatzilla/irc-urls.html
args = init_args
if o.scheme == "ircs": ssl = True
if o.hostname is not None: args["host"] = o.hostname
if o.port is not None: args["port"] = o.port
if o.username is not None: args["username"] = o.username
if o.password is not None: args["password"] = o.password
modifiers = o.path.split(",")
target = unquote(modifiers.pop(0)[1:])
# Read query string
params = parse_qs(o.query)
if "msg" in params:
if "on_connect" not in args:
args["on_connect"] = []
args["on_connect"].append("PRIVMSG %s :%s" % (target, params["msg"]))
if "key" in params:
if "channels" not in args:
args["channels"] = []
args["channels"].append((target, params["key"]))
if "pass" in params:
args["password"] = params["pass"]
if "charset" in params:
args["encoding"] = params["charset"]
#
if "channels" not in args and "isnick" not in modifiers:
args["channels"] = [ target ]
from nemubot.server.IRC import IRC as IRCServer
srv = IRCServer(**args)
if ssl:
try:
from ssl import create_default_context
context = create_default_context()
except ImportError:
# Python 3.3 compat
from ssl import SSLContext, PROTOCOL_TLSv1
context = SSLContext(PROTOCOL_TLSv1)
from ssl import wrap_socket
srv._fd = context.wrap_socket(srv._fd, server_hostname=o.hostname)
return srv
开发者ID:nbr23,项目名称:nemubot,代码行数:59,代码来源:__init__.py
示例4: __init__
def __init__(self, server_address, HandlerClass, dir):
super().__init__(server_address, HandlerClass, bind_and_activate=False)
ctx = SSLContext(PROTOCOL_TLSv1)
ctx.load_cert_chain(join(dir, 'server-cert.pem'), join(dir, 'server-key.pem'))
# ctx.load_verify_locations(join(dir, 'ca-cert.pem'))
self.socket = ctx.wrap_socket(self.socket, server_side=True)
self.server_bind()
self.server_activate()
开发者ID:andrewcooke,项目名称:n3,代码行数:8,代码来源:web.py
示例5: ssl_wrap_socket
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
ca_certs=None, server_hostname=None,
ssl_version=None):
"""
All arguments except `server_hostname` have the same meaning as for
:func:`ssl.wrap_socket`
:param server_hostname:
Hostname of the expected certificate
"""
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
if ca_certs:
try:
context.load_verify_locations(ca_certs)
# Py32 raises IOError
# Py33 raises FileNotFoundError
except Exception as e: # Reraise as SSLError
raise SSLError(e)
if certfile:
# FIXME: This block needs a test.
context.load_cert_chain(certfile, keyfile)
if HAS_SNI: # Platform-specific: OpenSSL with enabled SNI
return context.wrap_socket(sock, server_hostname=server_hostname)
return context.wrap_socket(sock)
开发者ID:2013Commons,项目名称:hue,代码行数:25,代码来源:util.py
示例6: sslContext
def sslContext(trustStore: str, keyStore: str) -> SSLContext:
sslContext = SSLContext(PROTOCOL_TLSv1_2)
sslContext.verify_mode = CERT_REQUIRED
storePath = "../../certificates/"
sslContext.load_verify_locations(storePath + trustStore)
sslContext.load_cert_chain(storePath + keyStore, password="KeyPass")
sslContext.set_ciphers("AES128-SHA")
return sslContext
开发者ID:sushicutta,项目名称:yass,代码行数:8,代码来源:socket_client.py
示例7: secureStream
class secureStream(stream):
def __init__(self):
stream.createsocket(stream)
self.contxt = SSLContext(PROTOCOL_TLSv1_2)
self.contxt.verify_mode = CERT_REQUIRED
self.contxt.load_default_certs()
def connect(self,host,port):
self.connection.settimeout(15)
self.connection.connect((host,port))
self.connection = self.contxt.wrap_socket(self.connection)#stream.connection
self.connection.settimeout(0)
def twitchconnect(self):
self.connect('api.twitch.tv',443)
def receive(self,buffer=4096):
try:
data = self.connection.recv(buffer).decode()
#print(data)#temporary
except:
return(None)
else:
return(data)
def transmit(self,data):
junk = self.receive()
data = data.encode()
try:
self.connection.sendall(data)
except ConnectionAbortedError:
print('Break detected!')
self.connection = None
self.connection = socket(AF_INET,SOCK_STREAM)
self.twitchconnect()
self.connection.settimeout(0)
except ConnectionResetError:
print('Break detected!')
self.connection = None
self.connection = socket(AF_INET,SOCK_STREAM)
self.twitchconnect()
self.connection.settimeout(0)
junk = None
def close(self):
self.connection.close()
开发者ID:SirRujak,项目名称:SirBot,代码行数:48,代码来源:network.py
示例8: create_thriftpy_context
def create_thriftpy_context(server_side=False, ciphers=None):
"""Backport create_default_context for older python versions.
The SSLContext has some default security options, you can disable them
manually, for example::
from thriftpy.transport import _ssl
context = _ssl.create_thriftpy_context()
context.options &= ~_ssl.OP_NO_SSLv3
You can do the same to enable compression.
"""
if MODERN_SSL:
if server_side:
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
else:
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if ciphers:
context.set_ciphers(ciphers)
else:
context = SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= OP_NO_SSLv2
context.options |= OP_NO_SSLv3
context.options |= OP_NO_COMPRESSION
# server/client default options
if server_side:
context.options |= OP_CIPHER_SERVER_PREFERENCE
context.options |= OP_SINGLE_DH_USE
context.options |= OP_SINGLE_ECDH_USE
else:
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
warnings.warn(
"ssl check hostname support disabled, upgrade your python",
InsecurePlatformWarning)
# Platform-specific: Python 2.6
if getattr(context, 'supports_set_ciphers', True):
if ciphers:
context.set_ciphers(ciphers)
else:
warnings.warn("ssl ciphers support disabled, upgrade your python",
InsecurePlatformWarning)
return context
开发者ID:AllanDaemon,项目名称:thriftpy,代码行数:48,代码来源:_ssl.py
示例9: __init__
def __init__(self, host: str, port: int, nicks: list, pwd: str, chans: list, op_pass: str,
use_ssl: bool=False, ssl_options: ssl.SSLContext=None,
encoding: str='utf-8'):
"""
Asynchronous IRC client
:param host: Server address
:param port: IRC Port
:param nicks: List of nicknames to try
:param pwd: NickServ password
:param use_ssl: Enable/Disable SSL
:param ssl_options: SSLContext object
:param encoding: Character encoding to use
"""
self.host = host
self.port = port
self.nicks = nicks
self.pwd = pwd
self.chans = chans
self.oper_pass = op_pass
self.ssl = use_ssl
self.encoding = encoding
self.__nickidx = 0
self.__handlers = {
b'PING': self.__ping,
b'NOTICE': self.__notice,
b'PRIVMSG': self.__privmsg,
b'PART': self.__part,
b'JOIN': self.__join,
b'MODE': self.__mode,
b'NICK': self.__nick,
b'QUIT': self.__quit,
b'KICK': self.__kick,
b'001': self.__welcome,
b'251': self.__user_count,
b'252': self.__op_count,
b'353': self.__namreply,
b'372': self.__motd,
b'376': self.__end_motd,
b'433': self.__nick_in_use,
b'900': self.__logged_in
}
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if use_ssl:
if not ssl_options:
ssl_options = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
# IRC often has unsigned certs, by default do not verify
ssl_options.verify_mode = ssl.CERT_NONE
sock = ssl.wrap_socket(sock, do_handshake_on_connect=False)
self.stream = tornado.iostream.SSLIOStream(sock, ssl_options=ssl_options)
else:
self.stream = tornado.iostream.IOStream(sock)
self.stream.connect((self.host, self.port),
self.__initial_auth,
server_hostname=self.host)
开发者ID:Xeronel,项目名称:CipherBot,代码行数:57,代码来源:irc.py
示例10: create_server
def create_server(
callback=None,
host='127.0.0.1',
port=8000,
ssl=None,
loop=None,
**kargs
):
"""
This is a function to assist in the creation of a growler HTTP server.
@param host str: hostname or ip address on which to bind
@param port: the port on which the server will listen
@param ssl ssl.SSLContext: The SSLContext for using TLS over the connection
@param loop asyncio.BaseEventLoop: The event loop to
@param kargs: Extra parameters passed to the HTTPServer instance created.
If there is an ssl parameter passed to this function, kargs
will require the value 'key' to be present, and an optional
'cert' parameter to pass to load_cert_chain.
@return An HTTPServer instance
"""
loop = asyncio.get_event_loop() if loop is None else loop
if ssl:
sslctx = SSLContext(ssl.PROTOCOL_SSLv23)
key = kargs.pop('key')
try:
sslctx.load_cert_chain(certfile=kargs.pop('cert'), keyfile=key)
except KeyError:
sslctx.load_cert_chain(certfile=key)
else:
sslctx = None
# What do I use as a 'callback' here?
srv = HTTPServer(cb=callback,
loop=loop,
ssl=sslctx,
host=host,
port=port,
**kargs
)
return srv
开发者ID:akubera,项目名称:Growler,代码行数:43,代码来源:server.py
示例11: ssl_wrap_socket
def ssl_wrap_socket(sock, keyfile = None, certfile = None, cert_reqs = None, ca_certs = None, server_hostname = None, ssl_version = None):
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
OP_NO_COMPRESSION = 131072
context.options |= OP_NO_COMPRESSION
if ca_certs:
try:
context.load_verify_locations(ca_certs)
except Exception as e:
raise SSLError(e)
if certfile:
context.load_cert_chain(certfile, keyfile)
if HAS_SNI:
return context.wrap_socket(sock, server_hostname=server_hostname)
return context.wrap_socket(sock)
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:16,代码来源:util.py
示例12: __init__
def __init__(self, localaddr, remoteaddr, ssl=False, certfile=None, keyfile=None, ssl_version=ssl.PROTOCOL_SSLv23, require_authentication=False, credential_validator=None, maximum_execution_time=30, process_count=5):
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
self.certfile = certfile
self.keyfile = keyfile
self.ssl_version = ssl_version
self.subprocesses = []
self.require_authentication = require_authentication
self.credential_validator = credential_validator
self.ssl = ssl
self.maximum_execution_time = maximum_execution_time
self.process_count = process_count
self.process_pool = None
self.context = SSLContext(ssl_version)
self.context.load_cert_chain(certfile=certfile, keyfile=keyfile)
开发者ID:tebrown,项目名称:secure-smtpd,代码行数:15,代码来源:smtp_server.py
示例13: get_ssl_context
def get_ssl_context(*args):
"""Create and return an SSLContext object."""
certfile, keyfile, ca_certs, cert_reqs = args
ctx = SSLContext(ssl.PROTOCOL_SSLv23)
if certfile is not None:
ctx.load_cert_chain(certfile, keyfile)
if ca_certs is not None:
ctx.load_verify_locations(ca_certs)
if cert_reqs is not None:
ctx.verify_mode = cert_reqs
return ctx
开发者ID:llvtt,项目名称:mongo-python-driver,代码行数:11,代码来源:ssl_support.py
示例14: ssl_wrap_socket
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
ca_certs=None, server_hostname=None,
ssl_version=None):
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
if ca_certs:
try:
context.load_verify_locations(ca_certs)
# Py32 raises IOError
# Py33 raises FileNotFoundError
except Exception as e: # Reraise as SSLError
raise SSLError(e)
if certfile:
# FIXME: This block needs a test.
context.load_cert_chain(certfile, keyfile)
if HAS_SNI: # Platform-specific: OpenSSL with enabled SNI
return (context, context.wrap_socket(sock, server_hostname=server_hostname))
return (context, context.wrap_socket(sock))
开发者ID:GShikari,项目名称:pentesttoolz,代码行数:22,代码来源:sslparser.py
示例15: create_socket
def create_socket(ip: str, port: int, context: ssl.SSLContext = None,
verify_hostname: bool = True, timeout: int = 10) -> ssl.SSLSocket:
"""
Creates a new SSL-wrapped socket.
:param ip: The IP to connect to.
:param port: The port to connect to.
:param context: The SSL context to use, or None for a default one to be created.
:param verify_hostname: Ignored
:param timeout: The timeout for recv().
:return: A new SSLSocket.
"""
verify_hostname = current_app.conf["SERVER_LOGIN_ON_CLIENT_VERIFY"]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
if context:
sock = context.wrap_socket(s)
else:
sock = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED if verify_hostname else ssl.CERT_NONE)
sock.connect((ip, port))
return sock
开发者ID:Veriny,项目名称:Solebaga,代码行数:21,代码来源:gutsama.py
示例16: ssl_wrap_socket
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=CERT_NONE,
ca_certs=None, server_hostname=None,
ssl_version=PROTOCOL_SSLv23):
"""
All arguments except `server_hostname` have the same meaning as for
:func:`ssl.wrap_socket`
:param server_hostname:
Hostname of the expected certificate
"""
context = SSLContext(ssl_version)
context.verify_mode = cert_reqs
if ca_certs:
try:
context.load_verify_locations(ca_certs)
except TypeError as e: # Reraise as SSLError
# fixme: This block needs a test.
raise SSLError(e)
if certfile:
# fixme: This block needs a test.
context.load_cert_chain(certfile, keyfile)
if HAS_SNI: # Platform-specific: OpenSSL with enabled SNI
return context.wrap_socket(sock, server_hostname=server_hostname)
return context.wrap_socket(sock)
开发者ID:holmesal,项目名称:musicthing,代码行数:24,代码来源:util.py
示例17: SSLContext
import threading
import time
from threading import Thread
# Pypy compatability
try:
from ssl import PROTOCOL_TLSv1_2 as PROTOCOL_TLSv1
except ImportError:
from ssl import PROTOCOL_TLSv1 as PROTOCOL_TLSv1
run_http = config["insecure"]["enabled"]
run_https = config["secure"]["enabled"]
if run_https:
context = SSLContext(PROTOCOL_TLSv1)
context.load_cert_chain(
config["secure"]["cert"],
config["secure"]["key"]
)
if run_http and run_https:
if config["debug"]:
raise Warning("Cannot run in debug mode with both https and http enabled due to flask limitations.")
Thread(
target=app.run,
kwargs={
"host": config["server"]["address"],
"port": config["secure"]["port"],
"debug": config["debug"],
"ssl_context": context
开发者ID:FroopleXP,项目名称:BitOrb,代码行数:31,代码来源:runserver.py
示例18: open_url
#.........这里部分代码省略.........
# add it to the list of handlers
ssl_handler = SSLValidationHandler(hostname, port)
handlers.append(ssl_handler)
if parsed[0] != 'ftp':
username = url_username
if username:
password = url_password
netloc = parsed[1]
elif '@' in parsed[1]:
credentials, netloc = parsed[1].split('@', 1)
if ':' in credentials:
username, password = credentials.split(':', 1)
else:
username = credentials
password = ''
parsed = list(parsed)
parsed[1] = netloc
# reconstruct url without credentials
url = urlparse.urlunparse(parsed)
if username and not force_basic_auth:
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
# this creates a password manager
passman.add_password(None, netloc, username, password)
# because we have put None at the start it will always
# use this username/password combination for urls
# for which `theurl` is a super-url
authhandler = urllib2.HTTPBasicAuthHandler(passman)
# create the AuthHandler
handlers.append(authhandler)
elif username and force_basic_auth:
if headers is None:
headers = {}
headers["Authorization"] = "Basic %s" % base64.b64encode("%s:%s" % (username, password))
if not use_proxy:
proxyhandler = urllib2.ProxyHandler({})
handlers.append(proxyhandler)
# pre-2.6 versions of python cannot use the custom https
# handler, since the socket class is lacking create_connection.
# Some python builds lack HTTPS support.
if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
handlers.append(CustomHTTPSHandler)
opener = urllib2.build_opener(*handlers)
urllib2.install_opener(opener)
if method:
if method.upper() not in ('OPTIONS','GET','HEAD','POST','PUT','DELETE','TRACE','CONNECT','PATCH'):
raise ConnectionError('invalid HTTP request method; %s' % method.upper())
request = RequestWithMethod(url, method.upper(), data)
else:
request = urllib2.Request(url, data)
# add the custom agent header, to help prevent issues
# with sites that block the default urllib agent string
request.add_header('User-agent', http_agent)
# if we're ok with getting a 304, set the timestamp in the
# header, otherwise make sure we don't get a cached copy
if last_mod_time and not force:
tstamp = last_mod_time.strftime('%a, %d %b %Y %H:%M:%S +0000')
request.add_header('If-Modified-Since', tstamp)
else:
request.add_header('cache-control', 'no-cache')
# user defined headers now, which may override things we've set above
if headers:
if not isinstance(headers, dict):
raise ValueError("headers provided to fetch_url() must be a dict")
for header in headers:
request.add_header(header, headers[header])
urlopen_args = [request, None]
if sys.version_info >= (2,6,0):
# urlopen in python prior to 2.6.0 did not
# have a timeout parameter
urlopen_args.append(timeout)
if HAS_SSLCONTEXT and not validate_certs:
# In 2.7.9, the default context validates certificates
context = SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
urlopen_args += (None, None, None, context)
r = urllib2.urlopen(*urlopen_args)
return r
开发者ID:RajeevNambiar,项目名称:temp,代码行数:101,代码来源:urls.py
示例19: open_url
def open_url(url, data=None, headers=None, method=None, use_proxy=True,
force=False, last_mod_time=None, timeout=10, validate_certs=True,
url_username=None, url_password=None, http_agent=None,
force_basic_auth=False, follow_redirects='urllib2'):
'''
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
Does not require the module environment
'''
handlers = []
ssl_handler = maybe_add_ssl_handler(url, validate_certs)
if ssl_handler:
handlers.append(ssl_handler)
# FIXME: change the following to use the generic_urlparse function
# to remove the indexed references for 'parsed'
parsed = urlparse(url)
if parsed[0] != 'ftp':
username = url_username
if headers is None:
headers = {}
if username:
password = url_password
netloc = parsed[1]
elif '@' in parsed[1]:
credentials, netloc = parsed[1].split('@', 1)
if ':' in credentials:
username, password = credentials.split(':', 1)
else:
username = credentials
password = ''
parsed = list(parsed)
parsed[1] = netloc
# reconstruct url without credentials
url = urlunparse(parsed)
if username and not force_basic_auth:
passman = urllib_request.HTTPPasswordMgrWithDefaultRealm()
# this creates a password manager
passman.add_password(None, netloc, username, password)
# because we have put None at the start it will always
# use this username/password combination for urls
# for which `theurl` is a super-url
authhandler = urllib_request.HTTPBasicAuthHandler(passman)
# create the AuthHandler
handlers.append(authhandler)
elif username and force_basic_auth:
headers["Authorization"] = basic_auth_header(username, password)
else:
try:
rc = netrc.netrc(os.environ.get('NETRC'))
login = rc.authenticators(parsed[1])
except IOError:
login = None
if login:
username, _, password = login
if username and password:
headers["Authorization"] = basic_auth_header(username, password)
if not use_proxy:
proxyhandler = urllib_request.ProxyHandler({})
handlers.append(proxyhandler)
if HAS_SSLCONTEXT and not validate_certs:
# In 2.7.9, the default context validates certificates
context = SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
handlers.append(urllib_request.HTTPSHandler(context=context))
# pre-2.6 versions of python cannot use the custom https
# handler, since the socket class is lacking create_connection.
# Some python builds lack HTTPS support.
if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
handlers.append(CustomHTTPSHandler)
handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs))
opener = urllib_request.build_opener(*handlers)
urllib_request.install_opener(opener)
if method:
if method.upper() not in ('OPTIONS','GET','HEAD','POST','PUT','DELETE','TRACE','CONNECT','PATCH'):
raise ConnectionError('invalid HTTP request method; %s' % method.upper())
request = RequestWithMethod(url, method.upper(), data)
else:
request = urllib_request.Request(url, data)
#.........这里部分代码省略.........
开发者ID:KMK-ONLINE,项目名称:ansible,代码行数:101,代码来源:urls.py
示例20: get_ssl_context
def get_ssl_context(*args):
"""Create and return an SSLContext object."""
certfile, keyfile, ca_certs, cert_reqs = args
# Note PROTOCOL_SSLv23 is about the most misleading name imaginable.
# This configures the server and client to negotiate the
# highest protocol version they both support. A very good thing.
ctx = SSLContext(ssl.PROTOCOL_SSLv23)
if hasattr(ctx, "options"):
# Explicitly disable SSLv2 and SSLv3. Note that up to
# date versions of MongoDB 2.4 and above already do this,
# python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4
# and SSLv3 in >= 3.4.3. There is no way for us to do this
# explicitly for python 2.6 or 2.7 before 2.7.9.
ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0)
ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0)
if certfile is not None:
ctx.load_cert_chain(certfile, keyfile)
if ca_certs is not None:
ctx.load_verify_locations(ca_certs)
elif cert_reqs != ssl.CERT_NONE:
# CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1
if hasattr(ctx, "load_default_certs"):
ctx.load_default_certs()
# Python >= 3.2.0, useless on Windows.
elif (sys.platform != "win32" and
hasattr(ctx, "set_default_verify_paths")):
ctx.set_default_verify_paths()
elif sys.platform == "win32" and HAVE_WINCERTSTORE:
with _WINCERTSLOCK:
if _WINCERTS is None:
_load_wincerts()
ctx.load_verify_locations(_WINCERTS.name)
elif HAVE_CERTIFI:
ctx.load_verify_locations(certifi.where())
else:
raise ConfigurationError(
"`ssl_cert_reqs` is not ssl.CERT_NONE and no system "
"CA certificates could be loaded. `ssl_ca_certs` is "
"required.")
ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
return ctx
开发者ID:Alpus,项目名称:Eth,代码行数:41,代码来源:ssl_support.py
注:本文中的ssl.SSLContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论