• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python ssl.SSLContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ssl.SSLContext的典型用法代码示例。如果您正苦于以下问题:Python SSLContext类的具体用法?Python SSLContext怎么用?Python SSLContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SSLContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: ssl_wrap_socket

    def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
                        ca_certs=None, server_hostname=None,
                        ssl_version=None, ciphers=None):
        """
        All arguments except `server_hostname` have the same meaning as for
        :func:`ssl.wrap_socket`

        :param server_hostname:
            Hostname of the expected certificate
        """
        context = SSLContext(ssl_version)
        context.verify_mode = cert_reqs

        # Disable TLS compression to migitate CRIME attack (issue #309)
        OP_NO_COMPRESSION = 0x20000
        context.options |= OP_NO_COMPRESSION

        if ca_certs:
            try:
                context.load_verify_locations(ca_certs)
            # Py32 raises IOError
            # Py33 raises FileNotFoundError
            except Exception as e:  # Reraise as SSLError
                raise SSLError(e)
        if certfile:
            # FIXME: This block needs a test.
            context.load_cert_chain(certfile, keyfile)
        if ciphers:
            context.set_ciphers(ciphers)
        if HAS_SNI:  # Platform-specific: OpenSSL with enabled SNI
            return context.wrap_socket(sock, server_hostname=server_hostname)
        return context.wrap_socket(sock)
开发者ID:Perkville,项目名称:requests,代码行数:32,代码来源:ssl_.py


示例2: create_urllib3_context

def create_urllib3_context(ssl_version=None, cert_reqs=None,
                           options=None, ciphers=None):
    """All arguments have the same meaning as ``ssl_wrap_socket``.

    By default, this function does a lot of the same work that
    ``ssl.create_default_context`` does on Python 3.4+. It:

    - Disables SSLv2, SSLv3, and compression
    - Sets a restricted set of server ciphers

    If you wish to enable SSLv3, you can do::

        from urllib3.util import ssl_
        context = ssl_.create_urllib3_context()
        context.options &= ~ssl_.OP_NO_SSLv3

    You can do the same to enable compression (substituting ``COMPRESSION``
    for ``SSLv3`` in the last line above).

    :param ssl_version:
        The desired protocol version to use. This will default to
        PROTOCOL_SSLv23 which will negotiate the highest protocol that both
        the server and your installation of OpenSSL support.
    :param cert_reqs:
        Whether to require the certificate verification. This defaults to
        ``ssl.CERT_REQUIRED``.
    :param options:
        Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``,
        ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``.
    :param ciphers:
        Which cipher suites to allow the server to select.
    :returns:
        Constructed SSLContext object with specified options
    :rtype: SSLContext
    """
    context = SSLContext(ssl_version or ssl.PROTOCOL_SSLv23)

    context.set_ciphers(ciphers or DEFAULT_CIPHERS)

    # Setting the default here, as we may have no ssl module on import
    cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs

    if options is None:
        options = 0
        # SSLv2 is easily broken and is considered harmful and dangerous
        options |= OP_NO_SSLv2
        # SSLv3 has several problems and is now dangerous
        options |= OP_NO_SSLv3
        # Disable compression to prevent CRIME attacks for OpenSSL 1.0+
        # (issue #309)
        options |= OP_NO_COMPRESSION

    context.options |= options

    context.verify_mode = cert_reqs
    if getattr(context, 'check_hostname', None) is not None:  # Platform-specific: Python 3.2
        # We do our own verification, including fingerprints and alternative
        # hostnames. So disable it here
        context.check_hostname = False
    return context
开发者ID:Ddoudou,项目名称:test,代码行数:60,代码来源:ssl_.py


示例3: factory

def factory(uri, ssl=False, **init_args):
    from urllib.parse import urlparse, unquote, parse_qs
    o = urlparse(uri)

    srv = None

    if o.scheme == "irc" or o.scheme == "ircs":
        # https://www.w3.org/Addressing/draft-mirashi-url-irc-01.txt
        # https://www-archive.mozilla.org/projects/rt-messaging/chatzilla/irc-urls.html
        args = init_args

        if o.scheme == "ircs": ssl = True
        if o.hostname is not None: args["host"] = o.hostname
        if o.port is not None: args["port"] = o.port
        if o.username is not None: args["username"] = o.username
        if o.password is not None: args["password"] = o.password

        modifiers = o.path.split(",")
        target = unquote(modifiers.pop(0)[1:])

        # Read query string
        params = parse_qs(o.query)

        if "msg" in params:
            if "on_connect" not in args:
                args["on_connect"] = []
            args["on_connect"].append("PRIVMSG %s :%s" % (target, params["msg"]))

        if "key" in params:
            if "channels" not in args:
                args["channels"] = []
            args["channels"].append((target, params["key"]))

        if "pass" in params:
            args["password"] = params["pass"]

        if "charset" in params:
            args["encoding"] = params["charset"]

        #
        if "channels" not in args and "isnick" not in modifiers:
            args["channels"] = [ target ]

        from nemubot.server.IRC import IRC as IRCServer
        srv = IRCServer(**args)

        if ssl:
            try:
                from ssl import create_default_context
                context = create_default_context()
            except ImportError:
                # Python 3.3 compat
                from ssl import SSLContext, PROTOCOL_TLSv1
                context = SSLContext(PROTOCOL_TLSv1)
            from ssl import wrap_socket
            srv._fd = context.wrap_socket(srv._fd, server_hostname=o.hostname)


    return srv
开发者ID:nbr23,项目名称:nemubot,代码行数:59,代码来源:__init__.py


示例4: __init__

    def __init__(self, server_address, HandlerClass, dir):
        super().__init__(server_address, HandlerClass, bind_and_activate=False)
        ctx = SSLContext(PROTOCOL_TLSv1)
        ctx.load_cert_chain(join(dir, 'server-cert.pem'), join(dir, 'server-key.pem'))
#        ctx.load_verify_locations(join(dir, 'ca-cert.pem'))
        self.socket = ctx.wrap_socket(self.socket, server_side=True)
        self.server_bind()
        self.server_activate()
开发者ID:andrewcooke,项目名称:n3,代码行数:8,代码来源:web.py


示例5: ssl_wrap_socket

    def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
                        ca_certs=None, server_hostname=None,
                        ssl_version=None):
        """
        All arguments except `server_hostname` have the same meaning as for
        :func:`ssl.wrap_socket`

        :param server_hostname:
            Hostname of the expected certificate
        """
        context = SSLContext(ssl_version)
        context.verify_mode = cert_reqs
        if ca_certs:
            try:
                context.load_verify_locations(ca_certs)
            # Py32 raises IOError
            # Py33 raises FileNotFoundError
            except Exception as e:  # Reraise as SSLError
                raise SSLError(e)
        if certfile:
            # FIXME: This block needs a test.
            context.load_cert_chain(certfile, keyfile)
        if HAS_SNI:  # Platform-specific: OpenSSL with enabled SNI
            return context.wrap_socket(sock, server_hostname=server_hostname)
        return context.wrap_socket(sock)
开发者ID:2013Commons,项目名称:hue,代码行数:25,代码来源:util.py


示例6: sslContext

def sslContext(trustStore: str, keyStore: str) -> SSLContext:
    sslContext = SSLContext(PROTOCOL_TLSv1_2)
    sslContext.verify_mode = CERT_REQUIRED
    storePath = "../../certificates/"
    sslContext.load_verify_locations(storePath + trustStore)
    sslContext.load_cert_chain(storePath + keyStore, password="KeyPass")
    sslContext.set_ciphers("AES128-SHA")
    return sslContext
开发者ID:sushicutta,项目名称:yass,代码行数:8,代码来源:socket_client.py


示例7: secureStream

class secureStream(stream):
    def __init__(self):
        stream.createsocket(stream)
        self.contxt = SSLContext(PROTOCOL_TLSv1_2)
        self.contxt.verify_mode = CERT_REQUIRED
        self.contxt.load_default_certs()

    def connect(self,host,port):
        self.connection.settimeout(15)
        self.connection.connect((host,port))
        self.connection = self.contxt.wrap_socket(self.connection)#stream.connection
        self.connection.settimeout(0)

    def twitchconnect(self):
        self.connect('api.twitch.tv',443)

    def receive(self,buffer=4096):
        try:
            data = self.connection.recv(buffer).decode()
            #print(data)#temporary
        except:
            return(None)
        else:
            return(data)

    def transmit(self,data):
        junk = self.receive()
        data = data.encode()
        try:
            self.connection.sendall(data)
        except ConnectionAbortedError:
            print('Break detected!')
            self.connection = None
            self.connection = socket(AF_INET,SOCK_STREAM)
            self.twitchconnect()
            self.connection.settimeout(0)
        except ConnectionResetError:
            print('Break detected!')
            self.connection = None
            self.connection = socket(AF_INET,SOCK_STREAM)
            self.twitchconnect()
            self.connection.settimeout(0)


        junk = None

    def close(self):
        self.connection.close()
开发者ID:SirRujak,项目名称:SirBot,代码行数:48,代码来源:network.py


示例8: create_thriftpy_context

def create_thriftpy_context(server_side=False, ciphers=None):
    """Backport create_default_context for older python versions.

    The SSLContext has some default security options, you can disable them
    manually, for example::

        from thriftpy.transport import _ssl
        context = _ssl.create_thriftpy_context()
        context.options &= ~_ssl.OP_NO_SSLv3

    You can do the same to enable compression.
    """
    if MODERN_SSL:
        if server_side:
            context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        else:
            context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)

        if ciphers:
            context.set_ciphers(ciphers)

    else:
        context = SSLContext(ssl.PROTOCOL_SSLv23)
        context.options |= OP_NO_SSLv2
        context.options |= OP_NO_SSLv3
        context.options |= OP_NO_COMPRESSION

        # server/client default options
        if server_side:
            context.options |= OP_CIPHER_SERVER_PREFERENCE
            context.options |= OP_SINGLE_DH_USE
            context.options |= OP_SINGLE_ECDH_USE
        else:
            context.verify_mode = ssl.CERT_REQUIRED
            # context.check_hostname = True
            warnings.warn(
                "ssl check hostname support disabled, upgrade your python",
                InsecurePlatformWarning)

        # Platform-specific: Python 2.6
        if getattr(context, 'supports_set_ciphers', True):
            if ciphers:
                context.set_ciphers(ciphers)
        else:
            warnings.warn("ssl ciphers support disabled, upgrade your python",
                          InsecurePlatformWarning)

    return context
开发者ID:AllanDaemon,项目名称:thriftpy,代码行数:48,代码来源:_ssl.py


示例9: __init__

    def __init__(self, host: str, port: int, nicks: list, pwd: str, chans: list, op_pass: str,
                 use_ssl: bool=False, ssl_options: ssl.SSLContext=None,
                 encoding: str='utf-8'):
        """
        Asynchronous IRC client
        :param host: Server address
        :param port: IRC Port
        :param nicks: List of nicknames to try
        :param pwd: NickServ password
        :param use_ssl: Enable/Disable SSL
        :param ssl_options: SSLContext object
        :param encoding: Character encoding to use
        """
        self.host = host
        self.port = port
        self.nicks = nicks
        self.pwd = pwd
        self.chans = chans
        self.oper_pass = op_pass
        self.ssl = use_ssl
        self.encoding = encoding
        self.__nickidx = 0
        self.__handlers = {
            b'PING': self.__ping,
            b'NOTICE': self.__notice,
            b'PRIVMSG': self.__privmsg,
            b'PART': self.__part,
            b'JOIN': self.__join,
            b'MODE': self.__mode,
            b'NICK': self.__nick,
            b'QUIT': self.__quit,
            b'KICK': self.__kick,
            b'001': self.__welcome,
            b'251': self.__user_count,
            b'252': self.__op_count,
            b'353': self.__namreply,
            b'372': self.__motd,
            b'376': self.__end_motd,
            b'433': self.__nick_in_use,
            b'900': self.__logged_in
        }

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)

        if use_ssl:
            if not ssl_options:
                ssl_options = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                # IRC often has unsigned certs, by default do not verify
                ssl_options.verify_mode = ssl.CERT_NONE
            sock = ssl.wrap_socket(sock, do_handshake_on_connect=False)
            self.stream = tornado.iostream.SSLIOStream(sock, ssl_options=ssl_options)
        else:
            self.stream = tornado.iostream.IOStream(sock)

        self.stream.connect((self.host, self.port),
                            self.__initial_auth,
                            server_hostname=self.host)
开发者ID:Xeronel,项目名称:CipherBot,代码行数:57,代码来源:irc.py


示例10: create_server

def create_server(
        callback=None,
        host='127.0.0.1',
        port=8000,
        ssl=None,
        loop=None,
        **kargs
        ):
    """
    This is a function to assist in the creation of a growler HTTP server.

    @param host str: hostname or ip address on which to bind
    @param port: the port on which the server will listen
    @param ssl ssl.SSLContext: The SSLContext for using TLS over the connection
    @param loop asyncio.BaseEventLoop: The event loop to
    @param kargs: Extra parameters passed to the HTTPServer instance created.
                  If there is an ssl parameter passed to this function, kargs
                  will require the value 'key' to be present, and an optional
                  'cert' parameter to pass to load_cert_chain.
    @return An HTTPServer instance
    """

    loop = asyncio.get_event_loop() if loop is None else loop

    if ssl:
        sslctx = SSLContext(ssl.PROTOCOL_SSLv23)
        key = kargs.pop('key')
        try:
            sslctx.load_cert_chain(certfile=kargs.pop('cert'), keyfile=key)
        except KeyError:
            sslctx.load_cert_chain(certfile=key)
    else:
        sslctx = None

    # What do I use as a 'callback' here?
    srv = HTTPServer(cb=callback,
                     loop=loop,
                     ssl=sslctx,
                     host=host,
                     port=port,
                     **kargs
                     )
    return srv
开发者ID:akubera,项目名称:Growler,代码行数:43,代码来源:server.py


示例11: ssl_wrap_socket

    def ssl_wrap_socket(sock, keyfile = None, certfile = None, cert_reqs = None, ca_certs = None, server_hostname = None, ssl_version = None):
        context = SSLContext(ssl_version)
        context.verify_mode = cert_reqs
        OP_NO_COMPRESSION = 131072
        context.options |= OP_NO_COMPRESSION
        if ca_certs:
            try:
                context.load_verify_locations(ca_certs)
            except Exception as e:
                raise SSLError(e)

        if certfile:
            context.load_cert_chain(certfile, keyfile)
        if HAS_SNI:
            return context.wrap_socket(sock, server_hostname=server_hostname)
        return context.wrap_socket(sock)
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:16,代码来源:util.py


示例12: __init__

 def __init__(self, localaddr, remoteaddr, ssl=False, certfile=None, keyfile=None, ssl_version=ssl.PROTOCOL_SSLv23, require_authentication=False, credential_validator=None, maximum_execution_time=30, process_count=5):
     smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
     self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
     self.certfile = certfile
     self.keyfile = keyfile
     self.ssl_version = ssl_version
     self.subprocesses = []
     self.require_authentication = require_authentication
     self.credential_validator = credential_validator
     self.ssl = ssl
     self.maximum_execution_time = maximum_execution_time
     self.process_count = process_count
     self.process_pool = None
     self.context = SSLContext(ssl_version)
     self.context.load_cert_chain(certfile=certfile, keyfile=keyfile)
开发者ID:tebrown,项目名称:secure-smtpd,代码行数:15,代码来源:smtp_server.py


示例13: get_ssl_context

 def get_ssl_context(*args):
     """Create and return an SSLContext object."""
     certfile, keyfile, ca_certs, cert_reqs = args
     ctx = SSLContext(ssl.PROTOCOL_SSLv23)
     if certfile is not None:
         ctx.load_cert_chain(certfile, keyfile)
     if ca_certs is not None:
         ctx.load_verify_locations(ca_certs)
     if cert_reqs is not None:
         ctx.verify_mode = cert_reqs
     return ctx
开发者ID:llvtt,项目名称:mongo-python-driver,代码行数:11,代码来源:ssl_support.py


示例14: ssl_wrap_socket

def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
                    ca_certs=None, server_hostname=None,
                    ssl_version=None):
    context = SSLContext(ssl_version)
    context.verify_mode = cert_reqs

    if ca_certs:
        try:
            context.load_verify_locations(ca_certs)
        # Py32 raises IOError   
        # Py33 raises FileNotFoundError
        except Exception as e:  # Reraise as SSLError
            raise SSLError(e)

    if certfile:
        # FIXME: This block needs a test.
        context.load_cert_chain(certfile, keyfile)

    if HAS_SNI:  # Platform-specific: OpenSSL with enabled SNI
        return (context, context.wrap_socket(sock, server_hostname=server_hostname))

    return (context, context.wrap_socket(sock))
开发者ID:GShikari,项目名称:pentesttoolz,代码行数:22,代码来源:sslparser.py


示例15: create_socket

def create_socket(ip: str, port: int, context: ssl.SSLContext = None,
                  verify_hostname: bool = True, timeout: int = 10) -> ssl.SSLSocket:
    """
    Creates a new SSL-wrapped socket.
    :param ip: The IP to connect to.
    :param port: The port to connect to.
    :param context: The SSL context to use, or None for a default one to be created.
    :param verify_hostname: Ignored
    :param timeout: The timeout for recv().
    :return: A new SSLSocket.
    """
    verify_hostname = current_app.conf["SERVER_LOGIN_ON_CLIENT_VERIFY"]

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(timeout)
    if context:
        sock = context.wrap_socket(s)
    else:
        sock = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED if verify_hostname else ssl.CERT_NONE)
    sock.connect((ip, port))
    return sock
开发者ID:Veriny,项目名称:Solebaga,代码行数:21,代码来源:gutsama.py


示例16: ssl_wrap_socket

    def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=CERT_NONE,
                        ca_certs=None, server_hostname=None,
                        ssl_version=PROTOCOL_SSLv23):
        """
        All arguments except `server_hostname` have the same meaning as for
        :func:`ssl.wrap_socket`

        :param server_hostname:
            Hostname of the expected certificate
        """
        context = SSLContext(ssl_version)
        context.verify_mode = cert_reqs
        if ca_certs:
            try:
                context.load_verify_locations(ca_certs)
            except TypeError as e:  # Reraise as SSLError
                # fixme: This block needs a test.
                raise SSLError(e)
        if certfile:
            # fixme: This block needs a test.
            context.load_cert_chain(certfile, keyfile)
        if HAS_SNI:  # Platform-specific: OpenSSL with enabled SNI
            return context.wrap_socket(sock, server_hostname=server_hostname)
        return context.wrap_socket(sock)
开发者ID:holmesal,项目名称:musicthing,代码行数:24,代码来源:util.py


示例17: SSLContext

import threading
import time
from threading import Thread


# Pypy compatability
try:
    from ssl import PROTOCOL_TLSv1_2 as PROTOCOL_TLSv1
except ImportError:
    from ssl import PROTOCOL_TLSv1 as PROTOCOL_TLSv1

run_http = config["insecure"]["enabled"]
run_https = config["secure"]["enabled"]

if run_https:
    context = SSLContext(PROTOCOL_TLSv1)
    context.load_cert_chain(
        config["secure"]["cert"],
        config["secure"]["key"]
    )

if run_http and run_https:
    if config["debug"]:
        raise Warning("Cannot run in debug mode with both https and http enabled due to flask limitations.")
    Thread(
        target=app.run,
        kwargs={
            "host": config["server"]["address"],
            "port": config["secure"]["port"],
            "debug": config["debug"],
            "ssl_context": context
开发者ID:FroopleXP,项目名称:BitOrb,代码行数:31,代码来源:runserver.py


示例18: open_url


#.........这里部分代码省略.........
        # add it to the list of handlers
        ssl_handler = SSLValidationHandler(hostname, port)
        handlers.append(ssl_handler)

    if parsed[0] != 'ftp':
        username = url_username

        if username:
            password = url_password
            netloc = parsed[1]
        elif '@' in parsed[1]:
            credentials, netloc = parsed[1].split('@', 1)
            if ':' in credentials:
                username, password = credentials.split(':', 1)
            else:
                username = credentials
                password = ''

            parsed = list(parsed)
            parsed[1] = netloc

            # reconstruct url without credentials
            url = urlparse.urlunparse(parsed)

        if username and not force_basic_auth:
            passman = urllib2.HTTPPasswordMgrWithDefaultRealm()

            # this creates a password manager
            passman.add_password(None, netloc, username, password)

            # because we have put None at the start it will always
            # use this username/password combination for  urls
            # for which `theurl` is a super-url
            authhandler = urllib2.HTTPBasicAuthHandler(passman)

            # create the AuthHandler
            handlers.append(authhandler)

        elif username and force_basic_auth:
            if headers is None:
                headers = {}

            headers["Authorization"] = "Basic %s" % base64.b64encode("%s:%s" % (username, password))

    if not use_proxy:
        proxyhandler = urllib2.ProxyHandler({})
        handlers.append(proxyhandler)

    # pre-2.6 versions of python cannot use the custom https
    # handler, since the socket class is lacking create_connection.
    # Some python builds lack HTTPS support.
    if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
        handlers.append(CustomHTTPSHandler)

    opener = urllib2.build_opener(*handlers)
    urllib2.install_opener(opener)

    if method:
        if method.upper() not in ('OPTIONS','GET','HEAD','POST','PUT','DELETE','TRACE','CONNECT','PATCH'):
            raise ConnectionError('invalid HTTP request method; %s' % method.upper())
        request = RequestWithMethod(url, method.upper(), data)
    else:
        request = urllib2.Request(url, data)

    # add the custom agent header, to help prevent issues
    # with sites that block the default urllib agent string
    request.add_header('User-agent', http_agent)

    # if we're ok with getting a 304, set the timestamp in the
    # header, otherwise make sure we don't get a cached copy
    if last_mod_time and not force:
        tstamp = last_mod_time.strftime('%a, %d %b %Y %H:%M:%S +0000')
        request.add_header('If-Modified-Since', tstamp)
    else:
        request.add_header('cache-control', 'no-cache')

    # user defined headers now, which may override things we've set above
    if headers:
        if not isinstance(headers, dict):
            raise ValueError("headers provided to fetch_url() must be a dict")
        for header in headers:
            request.add_header(header, headers[header])

    urlopen_args = [request, None]
    if sys.version_info >= (2,6,0):
        # urlopen in python prior to 2.6.0 did not
        # have a timeout parameter
        urlopen_args.append(timeout)

    if HAS_SSLCONTEXT and not validate_certs:
        # In 2.7.9, the default context validates certificates
        context = SSLContext(ssl.PROTOCOL_SSLv23)
        context.options |= ssl.OP_NO_SSLv2
        context.options |= ssl.OP_NO_SSLv3
        context.verify_mode = ssl.CERT_NONE
        context.check_hostname = False
        urlopen_args += (None, None, None, context)

    r = urllib2.urlopen(*urlopen_args)
    return r
开发者ID:RajeevNambiar,项目名称:temp,代码行数:101,代码来源:urls.py


示例19: open_url

def open_url(url, data=None, headers=None, method=None, use_proxy=True,
             force=False, last_mod_time=None, timeout=10, validate_certs=True,
             url_username=None, url_password=None, http_agent=None,
             force_basic_auth=False, follow_redirects='urllib2'):
    '''
    Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)

    Does not require the module environment
    '''
    handlers = []
    ssl_handler = maybe_add_ssl_handler(url, validate_certs)
    if ssl_handler:
        handlers.append(ssl_handler)

    # FIXME: change the following to use the generic_urlparse function
    #        to remove the indexed references for 'parsed'
    parsed = urlparse(url)
    if parsed[0] != 'ftp':
        username = url_username

        if headers is None:
            headers = {}

        if username:
            password = url_password
            netloc = parsed[1]
        elif '@' in parsed[1]:
            credentials, netloc = parsed[1].split('@', 1)
            if ':' in credentials:
                username, password = credentials.split(':', 1)
            else:
                username = credentials
                password = ''

            parsed = list(parsed)
            parsed[1] = netloc

            # reconstruct url without credentials
            url = urlunparse(parsed)

        if username and not force_basic_auth:
            passman = urllib_request.HTTPPasswordMgrWithDefaultRealm()

            # this creates a password manager
            passman.add_password(None, netloc, username, password)

            # because we have put None at the start it will always
            # use this username/password combination for  urls
            # for which `theurl` is a super-url
            authhandler = urllib_request.HTTPBasicAuthHandler(passman)

            # create the AuthHandler
            handlers.append(authhandler)

        elif username and force_basic_auth:
            headers["Authorization"] = basic_auth_header(username, password)

        else:
            try:
                rc = netrc.netrc(os.environ.get('NETRC'))
                login = rc.authenticators(parsed[1])
            except IOError:
                login = None

            if login:
                username, _, password = login
                if username and password:
                    headers["Authorization"] = basic_auth_header(username, password)

    if not use_proxy:
        proxyhandler = urllib_request.ProxyHandler({})
        handlers.append(proxyhandler)

    if HAS_SSLCONTEXT and not validate_certs:
        # In 2.7.9, the default context validates certificates
        context = SSLContext(ssl.PROTOCOL_SSLv23)
        context.options |= ssl.OP_NO_SSLv2
        context.options |= ssl.OP_NO_SSLv3
        context.verify_mode = ssl.CERT_NONE
        context.check_hostname = False
        handlers.append(urllib_request.HTTPSHandler(context=context))

    # pre-2.6 versions of python cannot use the custom https
    # handler, since the socket class is lacking create_connection.
    # Some python builds lack HTTPS support.
    if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
        handlers.append(CustomHTTPSHandler)

    handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs))

    opener = urllib_request.build_opener(*handlers)
    urllib_request.install_opener(opener)

    if method:
        if method.upper() not in ('OPTIONS','GET','HEAD','POST','PUT','DELETE','TRACE','CONNECT','PATCH'):
            raise ConnectionError('invalid HTTP request method; %s' % method.upper())
        request = RequestWithMethod(url, method.upper(), data)
    else:
        request = urllib_request.Request(url, data)

#.........这里部分代码省略.........
开发者ID:KMK-ONLINE,项目名称:ansible,代码行数:101,代码来源:urls.py


示例20: get_ssl_context

 def get_ssl_context(*args):
     """Create and return an SSLContext object."""
     certfile, keyfile, ca_certs, cert_reqs = args
     # Note PROTOCOL_SSLv23 is about the most misleading name imaginable.
     # This configures the server and client to negotiate the
     # highest protocol version they both support. A very good thing.
     ctx = SSLContext(ssl.PROTOCOL_SSLv23)
     if hasattr(ctx, "options"):
         # Explicitly disable SSLv2 and SSLv3. Note that up to
         # date versions of MongoDB 2.4 and above already do this,
         # python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4
         # and SSLv3 in >= 3.4.3. There is no way for us to do this
         # explicitly for python 2.6 or 2.7 before 2.7.9.
         ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0)
         ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0)
     if certfile is not None:
         ctx.load_cert_chain(certfile, keyfile)
     if ca_certs is not None:
         ctx.load_verify_locations(ca_certs)
     elif cert_reqs != ssl.CERT_NONE:
         # CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1
         if hasattr(ctx, "load_default_certs"):
             ctx.load_default_certs()
         # Python >= 3.2.0, useless on Windows.
         elif (sys.platform != "win32" and
               hasattr(ctx, "set_default_verify_paths")):
             ctx.set_default_verify_paths()
         elif sys.platform == "win32" and HAVE_WINCERTSTORE:
             with _WINCERTSLOCK:
                 if _WINCERTS is None:
                     _load_wincerts()
             ctx.load_verify_locations(_WINCERTS.name)
         elif HAVE_CERTIFI:
             ctx.load_verify_locations(certifi.where())
         else:
             raise ConfigurationError(
                 "`ssl_cert_reqs` is not ssl.CERT_NONE and no system "
                 "CA certificates could be loaded. `ssl_ca_certs` is "
                 "required.")
     ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
     return ctx
开发者ID:Alpus,项目名称:Eth,代码行数:41,代码来源:ssl_support.py



注:本文中的ssl.SSLContext类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sslToolLib.chdir函数代码示例发布时间:2022-05-27
下一篇:
Python ssl.wrap_socket函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap