本文整理汇总了Python中ssl._create_unverified_context函数的典型用法代码示例。如果您正苦于以下问题:Python _create_unverified_context函数的具体用法?Python _create_unverified_context怎么用?Python _create_unverified_context使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了_create_unverified_context函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_teardown
def test_teardown(self):
try:
lock = threading.Lock()
https = https_reader.server_plugin(lock, PORT)
except Exception as e:
print e
self.fail("Failed to startup server")
try:
conn = httplib.HTTPSConnection('localhost', 4443, timeout=5, context=ssl._create_unverified_context())
conn.request("GET", "/")
r1 = conn.getresponse()
if r1.status == 404:
connection = True
else:
connection = False
conn.close()
except Exception as e:
print e
self.fail("Client couldn't to server")
try:
https.tear_down()
httplib.HTTPSConnection('localhost', 4443, timeout=5, context=ssl._create_unverified_context())
r2 = conn.getresponse()
self.fail("Server failed to shutdown")
except Exception as e:
#sever shutdown
self.assertTrue(True)
开发者ID:WDKing,项目名称:SuperHoneyPot,代码行数:29,代码来源:https_reader_test.py
示例2: getServices
def getServices(agsSiteAdminURL, token):
# Setup the parameters
parameters = urllib.urlencode({'token': token,
'f': 'json'})
queryString = parameters.encode('utf-8')
# Post request
try:
printMessage("Querying ArcGIS Server for a list of services - " + agsSiteAdminURL + "/admin/services" + "...","info")
context = ssl._create_unverified_context()
request = urllib2.Request(agsSiteAdminURL + "/admin/services",queryString)
responseJSON = json.loads(urllib2.urlopen(request, context=context).read())
if "error" in str(responseJSON).lower():
printMessage(responseJSON,"error")
if (sendErrorEmail == "true"):
message = "There is an issue with the ArcGIS Server site - " + agsSiteAdminURL + "..." + "<br/><br/>"
message += str(responseJSON) + "<br/>"
# Send email
sendEmail(message,None)
# Exit the python script
sys.exit()
else:
# Iterate through services
services = []
for eachService in responseJSON['services']:
services.append(eachService['serviceName']+ "." + eachService['type'])
# Iterate through folders
for folder in responseJSON['folders']:
# Ignore the system or utilities folder
if ((folder.lower() != "system") or (folder.lower() != "utilities")):
context = ssl._create_unverified_context()
request = urllib2.Request(agsSiteAdminURL + "/admin/services/" + folder,queryString)
responseJSON = json.loads(urllib2.urlopen(request, context=context).read())
if "error" in str(responseJSON).lower():
printMessage(responseJSON,"error")
if (sendErrorEmail == "true"):
message = "There is an issue with the ArcGIS Server site - " + agsSiteAdminURL + "..." + "<br/><br/>"
message += str(responseJSON) + "<br/>"
# Send email
sendEmail(message,None)
# Exit the python script
sys.exit()
else:
# Iterate through services
for eachService in responseJSON['services']:
services.append(folder + "/" + eachService['serviceName']+ "." + eachService['type'])
# Return services list
return services
except urllib2.URLError, error:
printMessage("There is an issue connecting to the ArcGIS Server site - " + agsSiteAdminURL + "...","error")
printMessage(error,"error")
if (sendErrorEmail == "true"):
message = "There is an issue connecting to the ArcGIS Server site - " + agsSiteAdminURL + "..." + "<br/><br/>"
message += str(error) + "<br/>"
# Send email
sendEmail(message,None)
# Exit the python script
sys.exit()
开发者ID:WestonSF,项目名称:ArcGISOnlinePortalToolkit,代码行数:59,代码来源:ArcGISServerAvailability.py
示例3: post_process_options
def post_process_options(self):
if self.options.disable_ssl_verification:
try:
import ssl
ssl._create_unverified_context()
except Exception:
raise CommandError('The --disable-ssl-verification flag is '
'only available with Python 2.7.9+')
开发者ID:reviewboard,项目名称:rbtools,代码行数:8,代码来源:__init__.py
示例4: add_spark
def add_spark(self,node):
"""Connect to SPARK on local host and dump information.
Uses requests. Note: disables HTTPS certificate warnings."""
import os
import json
from urllib.request import urlopen
import ssl
if "SPARK_ENV_LOADED" not in os.environ:
return # no spark
spark = ET.SubElement(node, 'spark')
try:
import requests
import urllib3
urllib3.disable_warnings()
except ImportError:
ET.SubElement(spark,'error').text = "SPARK_ENV_LOADED present but requests module not available"
return
host = 'localhost'
p1 = 4040
p2 = 4050
import urllib.error
for port in range(p1,p2+1):
try:
url = 'http://{}:{}/api/v1/applications/'.format(host,port)
resp = urlopen(url, context=ssl._create_unverified_context())
spark_data = resp.read()
break
except (ConnectionError, ConnectionRefusedError, urllib.error.URLError) as e:
continue
if port>=p2:
ET.SubElement(spark,'error').text = f"SPARK_ENV_LOADED present but no listener on {host} ports {p1}-{p2}"
return
# Looks like we have spark!
for app in json.loads(spark_data):
app_id = app['id']
app_name = app['name']
e = ET.SubElement(spark,'application',{'id':app_id,'name':app_name})
attempt_count = 1
for attempt in app['attempts']:
e = ET.SubElement(spark,'attempt')
json_to_xml(e,attempt)
for param in ['jobs','allexecutors','storage/rdd']:
url = f'http://{host}:{port}/api/v1/applications/{app_id}/{param}'
resp = urlopen(url, context=ssl._create_unverified_context())
data = resp.read()
e = ET.SubElement(spark,param.replace("/","_"))
json_to_xml(e,json.loads(data))
开发者ID:simsong,项目名称:dfxml,代码行数:51,代码来源:writer.py
示例5: ocr_barcode
def ocr_barcode(token, image_str=None, url=None):
_url = 'https://ais.cn-north-1.myhuaweicloud.com/v1.0/ocr/barcode'
_data = {
"image": image_str,
"url": url
}
kreq = urllib2.Request(url=_url)
kreq.add_header('Content-Type', 'application/json')
kreq.add_header('X-Auth-Token', token)
kreq.add_data(json.dumps(_data))
resp = None
status_code = None
try:
#
# Here we use the unvertified-ssl-context, Because in FunctionStage
# the client CA-validation have some problem, so we must do this.
#
_context = ssl._create_unverified_context()
r = urllib2.urlopen(kreq, context=_context)
#
# We use HTTPError and URLError,because urllib2 can't process the 4XX &
# 500 error in the single urlopen function.
#
# If you use a modern, high-level designed HTTP client lib, Yeah, I mean requests,
# there is no this problem.
#
except HTTPError, e:
resp = e.read()
status_code = e.code
开发者ID:fox1006,项目名称:ais-sdk,代码行数:33,代码来源:ocr_barcode.py
示例6: __send_xml_str
def __send_xml_str(self, xml_str):
logger.debug("Sending: %s" % xml_str)
xml_data = urllib.parse.urlencode({'XML': xml_str})
request = urllib.request.Request(self.door_url(), xml_data)
base64string = base64.encodestring('%s:%s' % (self.door_user, self.door_pass)).replace('\n', '')
request.add_header("Authorization", "Basic %s" % base64string)
context = ssl._create_unverified_context()
context.set_ciphers('RC4-SHA')
self.lock.acquire()
try:
result = urllib.request.urlopen(request, context=context)
return_code = result.getcode()
return_xml = result.read()
result.close()
finally:
self.lock.release()
logger.debug("Response code: %d" % return_code)
logger.debug("Response: %s" % return_xml)
if return_code != 200:
raise Exception("Did not receive 200 return code")
error = get_attribute(return_xml, "errorMessage")
if error:
raise Exception("Received an error: %s" % error)
return return_xml
开发者ID:nadineproject,项目名称:nadine,代码行数:28,代码来源:hid_control.py
示例7: _make_connect
def _make_connect(self, https, host, port, proxy=None):
if not https:
if proxy:
con = httplib.HTTPConnection(
proxy[0], proxy[1], timeout=self.timeout)
con.set_tunnel(host, port)
else:
con = httplib.HTTPConnection(host, port, timeout=self.timeout)
# con .set_debuglevel(2) #?
con.connect()
return con
for p in self.protocol:
context = ssl._create_unverified_context(p)
try:
if proxy:
con = httplib.HTTPSConnection(
proxy[0], proxy[1], context=context,
timeout=self.timeout)
con.set_tunnel(host, port)
else:
con = httplib.HTTPSConnection(
host, port, context=context, timeout=self.timeout)
con.connect()
return con
except ssl.SSLError, e:
# print e,protocol
pass
开发者ID:2625668714,项目名称:pocscan,代码行数:28,代码来源:hackhttp.py
示例8: send
def send(self, type, text, raw):
message_time = time.time()
message_timestamp = time.ctime(message_time)
if check_time_restriction(self.starttime, self.endtime):
if self.suppress_timestamp == False:
self.msg_to_send = text[:10000].encode('utf8') + " Message Sent at: " + message_timestamp
else:
self.msg_to_send = text[:10000].encode('utf8')
notify_data = {
'apikey': self.api_key,
'application': self.app_name,
'event': self.event,
'description': self.msg_to_send,
'priority': self.priority
}
if sys.version_info >= (2,7,9):
http_handler = HTTPSConnection(PROWL_URL, context=ssl._create_unverified_context())
else:
http_handler = HTTPSConnection(PROWL_URL)
http_handler.request(PROWL_METHOD, PROWL_PATH, headers=self.headers,body=urlencode(notify_data))
http_response = http_handler.getresponse()
if http_response.status == 200:
return True
else:
current_app.logger.info('Event Prowl Notification Failed: {0}'. format(http_response.reason))
raise Exception('Prowl Notification Failed: {0}' . format(http_response.reason))
开发者ID:nutechsoftware,项目名称:alarmdecoder-webapp,代码行数:32,代码来源:types.py
示例9: __init__
def __init__(self, host=None, port=None, types=(),
timeout=8, verify_ssl=False):
self.host = host
if not Resolver.host_is_ip(self.host):
raise ValueError('The host of proxy should be the IP address. '
'Try Proxy.create() if the host is a domain')
self.port = int(port)
if self.port > 65535:
raise ValueError('The port of proxy cannot be greater than 65535')
self.expected_types = set(types) & {'HTTP', 'HTTPS', 'CONNECT:80',
'CONNECT:25', 'SOCKS4', 'SOCKS5'}
self._timeout = timeout
self._ssl_context = (True if verify_ssl else
_ssl._create_unverified_context())
self._types = {}
self._is_working = False
self.stat = {'requests': 0, 'errors': Counter()}
self._ngtr = None
self._geo = Resolver.get_ip_info(self.host)
self._log = []
self._runtimes = []
self._schemes = ()
self._closed = True
self._reader = {'conn': None, 'ssl': None}
self._writer = {'conn': None, 'ssl': None}
开发者ID:ericbaranowski,项目名称:ProxyBroker,代码行数:27,代码来源:proxy.py
示例10: get_page
def get_page(url):
"""Return html code from cmsweb.cern.ch of the page with the given url"""
print "HTTP access: ", url
try:
socket_obj = urllib2.urlopen(url)
except Exception, e:
socket_obj = urllib2.urlopen(url, context=ssl._create_unverified_context())
开发者ID:tier-one-monitoring,项目名称:monstr,代码行数:7,代码来源:Utils.py
示例11: request
def request(self, handler):
if self.__is_secure:
conn = http.client.HTTPSConnection(self.__server_address, context=ssl._create_unverified_context())
else:
conn = http.client.HTTPConnection(self.__server_address)
try:
# Send request to server
(response, response_body, headers) = self.__do_request(conn, handler)
if response is not None:
# Print log message
self.__log_message("""\trequest = {} {}
response headers = {}
response status = {} {}
response data = {}""".format(handler.method, handler.request_url,
str(response.headers),
response.status, response.reason,
("(none)" if response_body == "" else response_body)))
return response, response_body
else:
print("There was a problem making the request {0}...".format(handler.request_url))
except http.client.HTTPException as err:
print("Connection error: {}".format(err))
except ConnectionRefusedError:
print("Connection refused! Check that the gateway daemon is running.")
except Exception as err:
print("Unknown error: {0}".format(err))
finally:
conn.close()
return None
开发者ID:cami-project,项目名称:cami-project,代码行数:30,代码来源:HttpClient.py
示例12: readGithubCommitLogs
def readGithubCommitLogs(self):
url = 'https://api.github.com/repos/openpli/%s/commits' % self.projects[self.project][0]
commitlog = ""
from datetime import datetime
from json import loads
from urllib2 import urlopen
try:
commitlog += 80 * '-' + '\n'
commitlog += url.split('/')[-2] + '\n'
commitlog += 80 * '-' + '\n'
try:
# OpenPli 5.0 uses python 2.7.11 and here we need to bypass the certificate check
from ssl import _create_unverified_context
log = loads(urlopen(url, timeout=5, context=_create_unverified_context()).read())
except:
log = loads(urlopen(url, timeout=5).read())
for c in log:
creator = c['commit']['author']['name']
title = c['commit']['message']
date = datetime.strptime(c['commit']['committer']['date'], '%Y-%m-%dT%H:%M:%SZ').strftime('%x %X')
commitlog += date + ' ' + creator + '\n' + title + 2 * '\n'
commitlog = commitlog.encode('utf-8')
self.cachedProjects[self.projects[self.project][1]] = commitlog
except:
commitlog += _("Currently the commit log cannot be retrieved - please try later again")
self["AboutScrollLabel"].setText(commitlog)
开发者ID:factorybuild,项目名称:stbgui,代码行数:26,代码来源:About.py
示例13: getServer
def getServer(self):
#return self.MyServer
print "server by xmlrpc = ", self.server
if sys.version_info<(2,7,9):
return ServerProxy(self.server,allow_none = 1)
else:
return ServerProxy(self.server, allow_none = 1, context=ssl._create_unverified_context())
开发者ID:CuonDeveloper,项目名称:cuon,代码行数:7,代码来源:xmlrpc.py
示例14: get_actual_sid
def get_actual_sid(sid, gamepath):
version = ""
with open(join_path(gamepath, "game/ffxivgame.ver"), "r") as f:
version = f.readline()
if version == "":
raise Exception("Unable to read version information!")
version_hash = (
gen_hash(join_path(gamepath, "boot/ffxivboot.exe"))
+ ","
+ gen_hash(join_path(gamepath, "boot/ffxivlauncher.exe"))
+ ","
+ gen_hash(join_path(gamepath, "boot/ffxivupdater.exe"))
)
response = open_url(
version_url.format(version=version, sid=sid),
version_hash.encode("utf-8"),
version_headers,
ssl._create_unverified_context(),
)
response_data = response.read().decode("utf-8")
actual_sid = response.headers.get("X-Patch-Unique-Id")
if response.headers.get("X-Latest-Version") != version or response_data != "":
print(response.headers.as_string())
print(response_data)
raise Exception("Game out of date. Please run the official launcher to update it.")
return (actual_sid, version)
开发者ID:EmperorArthur,项目名称:FF14Launcher,代码行数:30,代码来源:login.py
示例15: install_pip
def install_pip():
"""Install pip"""
try:
import pip # NOQA
except ImportError:
if PY3:
from urllib.request import urlopen
else:
from urllib2 import urlopen
if hasattr(ssl, '_create_unverified_context'):
ctx = ssl._create_unverified_context()
else:
ctx = None
kw = dict(context=ctx) if ctx else {}
safe_print("downloading %s" % GET_PIP_URL)
req = urlopen(GET_PIP_URL, **kw)
data = req.read()
tfile = os.path.join(tempfile.gettempdir(), 'get-pip.py')
with open(tfile, 'wb') as f:
f.write(data)
try:
sh('%s %s --user' % (PYTHON, tfile))
finally:
os.remove(tfile)
开发者ID:marcinkuzminski,项目名称:psutil,代码行数:27,代码来源:winmake.py
示例16: __init__
def __init__(self):
try:
context = ssl._create_unverified_context()
except AttributeError:
context = None
FancyURLopener.__init__(self, context=context)
开发者ID:matevzv,项目名称:vesna-alh-tools,代码行数:7,代码来源:__init__.py
示例17: download_engine_certs
def download_engine_certs(prefix):
engine_ip = prefix.virt_env.engine_vm().ip()
engine_base_url = '/ovirt-engine/services/pki-resource?resource=ca-certificate&format='
engine_ca_url = engine_base_url + 'X509-PEM-CA'
engine_ssh_url = engine_base_url + 'OPENSSH-PUBKEY'
# We use an unverified connection, as L0 host cannot resolve '...engine.lago.local'
conn = httplib.HTTPSConnection(engine_ip, context=ssl._create_unverified_context())
def _download_file(url, path):
conn.request("GET", url)
resp = conn.getresponse()
nt.assert_true(
resp.status == 200
)
data = resp.read()
with open(path, 'wb') as outfile:
outfile.write(data)
_download_file(engine_ca_url, 'engine-ca.pem')
# TODO: verify certificate. Either use it, or run:
# 'openssl x509 -in engine-ca.pem -text -noout'
_download_file(engine_ssh_url, 'engine-rsa.pub')
# TODO: verify public key. Either use it, or run:
# 'ssh-keygen -l -f engine-rsa.pub'
conn.close()
开发者ID:oVirt,项目名称:ovirt-system-tests,代码行数:28,代码来源:002_bootstrap.py
示例18: get_all_versions
def get_all_versions():
url = _LIST_VERSIONS_URL
url += '?_=' + str(int(time.time())) # prevents caching
want_prereleases = subprocess.check_output('source /data/etc/os.conf && echo $os_prereleases', shell=True, stderr=subprocess.STDOUT).strip() == 'true'
try:
logging.debug('board is %s' % _BOARD)
logging.debug('fetching %s...' % url)
context = ssl._create_unverified_context()
response = urllib2.urlopen(url, timeout=settings.REMOTE_REQUEST_TIMEOUT, context=context)
releases = json.load(response)
versions = []
for release in releases:
if release.get('prerelease') and not want_prereleases:
continue
for asset in release.get('assets', []):
if not re.match('^motioneyeos-%s-\d{8}\.img.gz$' % _BOARD, asset['name']):
continue
versions.append(release['name'])
logging.debug('available versions: %(versions)s' % {'versions': ', '.join(versions)})
return sorted(versions)
except Exception as e:
logging.error('could not get versions: %s' % e, exc_info=True)
return []
开发者ID:ccrisan,项目名称:motioneyeos,代码行数:34,代码来源:update.py
示例19: connect
def connect(verify_ssl=True):
global _CONNECTION
if verify_ssl:
_CONNECTION = httplib.HTTPSConnection(HOST, PORT)
else:
insecure_context = ssl._create_unverified_context()
_CONNECTION = httplib.HTTPSConnection(HOST, PORT, context=insecure_context)
开发者ID:alexxa,项目名称:pulp,代码行数:7,代码来源:pic.py
示例20: test_app_using_ipv6_and_ssl
def test_app_using_ipv6_and_ssl(self):
CONF.set_default("ssl_cert_file",
os.path.join(TEST_VAR_DIR, 'certificate.crt'))
CONF.set_default("ssl_key_file",
os.path.join(TEST_VAR_DIR, 'privatekey.key'))
greetings = 'Hello, World!!!'
@webob.dec.wsgify
def hello_world(req):
return greetings
server = manila.wsgi.Server("test_app",
hello_world,
host="::1",
port=0)
server.start()
if hasattr(ssl, '_create_unverified_context'):
response = urllib.request.urlopen(
'https://[::1]:%d/' % server.port,
context=ssl._create_unverified_context())
else:
response = urllib.request.urlopen(
'https://[::1]:%d/' % server.port)
self.assertEqual(greetings, response.read())
server.stop()
开发者ID:JosonYuan,项目名称:manila,代码行数:29,代码来源:test_wsgi.py
注:本文中的ssl._create_unverified_context函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论