本文整理汇总了Python中six.moves.urllib.request.install_opener函数的典型用法代码示例。如果您正苦于以下问题:Python install_opener函数的具体用法?Python install_opener怎么用?Python install_opener使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了install_opener函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: set_proxy
def set_proxy(proxy, user=None, password=''):
"""
Set the HTTP proxy for Python to download through.
If ``proxy`` is None then tries to set proxy from environment or system
settings.
:param proxy: The HTTP proxy server to use. For example:
'http://proxy.example.com:3128/'
:param user: The username to authenticate with. Use None to disable
authentication.
:param password: The password to authenticate with.
"""
from nltk import compat
if proxy is None:
# Try and find the system proxy settings
try:
proxy = getproxies()['http']
except KeyError:
raise ValueError('Could not detect default proxy settings')
# Set up the proxy handler
proxy_handler = ProxyHandler({'https': proxy, 'http': proxy})
opener = build_opener(proxy_handler)
if user is not None:
# Set up basic proxy authentication if provided
password_manager = HTTPPasswordMgrWithDefaultRealm()
password_manager.add_password(realm=None, uri=proxy, user=user, passwd=password)
opener.add_handler(ProxyBasicAuthHandler(password_manager))
opener.add_handler(ProxyDigestAuthHandler(password_manager))
# Overide the existing url opener
install_opener(opener)
开发者ID:prz3m,项目名称:kind2anki,代码行数:35,代码来源:util.py
示例2: __init__
def __init__(self, url, username=None, password=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
super().__init__(url, username, password, timeout)
# Setting ssl key verificationas false
context = ssl._create_stdlib_context(check_hostname=False)
unverified_handler = HTTPSHandler(context=context, check_hostname=False)
install_opener(build_opener(unverified_handler))
开发者ID:albertogviana,项目名称:jenkins_middleware,代码行数:7,代码来源:jenkins_api.py
示例3: debug
def debug():
"""
Activate debugging on urllib2.
"""
if six.PY2:
handler = HTTPSHandler(debuglevel=1)
opener = build_opener(handler)
install_opener(opener)
else:
http_client.HTTPConnection.debuglevel = 1
开发者ID:aioTV,项目名称:universal-analytics-python,代码行数:10,代码来源:Tracker.py
示例4: build_request_with_data
def build_request_with_data(url, data, api_key, method):
"""Build a request with the received method."""
http_redirect_with_data_handler = HTTPRedirectWithDataHandler(method=method)
opener = build_opener(http_redirect_with_data_handler)
install_opener(opener)
url = make_url(url, api_key=api_key, args=None)
request = Request(url, headers={'Content-Type': 'application/json'}, data=json.dumps(data))
request_method = request.get_method()
if request_method != method:
request.get_method = lambda: method
return opener, request
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:11,代码来源:common.py
示例5: open_with_basic_auth
def open_with_basic_auth(url, auth):
"""
opens an url protected with basic http authentication
:param url: string - the url to open
:param auth:
:return:
"""
user, passwd = auth
p = six.moves.urllib.request.HTTPPasswordMgrWithDefaultRealm()
p.add_password(None, url, user, passwd)
auth_handler = six.moves.urllib.request.HTTPBasicAuthHandler(p)
opener = request.build_opener(auth_handler)
request.install_opener(opener)
return opener.open(url)
开发者ID:horatium,项目名称:json_schema_generator,代码行数:14,代码来源:recorder.py
示例6: quick_test
def quick_test():
"""Quick test/example of CacheHandler"""
from httplib2 import FileCache
logging.basicConfig(level=logging.DEBUG)
store = FileCache(".cache")
opener = request.build_opener(CacheHandler(store))
request.install_opener(opener)
response = request.urlopen("http://www.google.com/")
print(response.headers)
print("Response:", response.read()[:100], '...\n')
response.reload(store)
print(response.headers)
print("After reload:", response.read()[:100], '...\n')
开发者ID:jaraco,项目名称:jaraco.net,代码行数:14,代码来源:caching.py
示例7: url_get
def url_get(base_url, password_mgr=None, pathspec=None, params=None):
"""Make contact with the uri provided and return any contents."""
# Uses system proxy settings if they exist.
proxy = urlrequest.ProxyHandler()
if password_mgr is not None:
auth = urlrequest.HTTPDigestAuthHandler(password_mgr)
urlopener = urlrequest.build_opener(proxy, auth)
else:
urlopener = urlrequest.build_opener(proxy)
urlrequest.install_opener(urlopener)
full_url = build_url(base_url, pathspec=pathspec, params=params)
response = urlopener.open(full_url)
content = response.read()
response.close()
return content
开发者ID:ksuderman,项目名称:Galaxy,代码行数:15,代码来源:__init__.py
示例8: run
def run(self, args, opts):
try:
import setuptools
except ImportError:
raise UsageError("setuptools not installed")
request.install_opener(request.build_opener(HTTPRedirectHandler))
if opts.list_targets:
for name, target in _get_targets().items():
print("%-20s %s" % (name, target['url']))
return
if opts.list_projects:
target = _get_target(opts.list_projects)
req = request.Request(_url(target, 'listprojects.json'))
_add_auth_header(req, target)
f = request.urlopen(req)
projects = json.loads(f.read())['projects']
print(os.linesep.join(projects))
return
tmpdir = None
if opts.build_egg: # build egg only
egg, tmpdir = _build_egg()
_log("Writing egg to %s" % opts.build_egg)
shutil.copyfile(egg, opts.build_egg)
else: # buld egg and deploy
target_name = _get_target_name(args)
target = _get_target(target_name)
project = _get_project(target, opts)
version = _get_version(target, opts)
if opts.egg:
_log("Using egg: %s" % opts.egg)
egg = opts.egg
else:
_log("Packing version %s" % version)
egg, tmpdir = _build_egg()
if not _upload_egg(target, egg, project, version):
self.exitcode = 1
if tmpdir:
if opts.debug:
_log("Output dir not removed: %s" % tmpdir)
else:
shutil.rmtree(tmpdir)
开发者ID:505555998,项目名称:scrapy,代码行数:47,代码来源:deploy.py
示例9: _make_opener
def _make_opener(self, realm, base_url, username, password):
"""uTorrent API need HTTP Basic Auth and cookie support for token verify."""
auth_handler = HTTPBasicAuthHandler()
auth_handler.add_password(realm=realm,
uri=base_url,
user=username,
passwd=password)
opener = build_opener(auth_handler)
install_opener(opener)
cookie_jar = CookieJar()
cookie_handler = HTTPCookieProcessor(cookie_jar)
handlers = [auth_handler, cookie_handler]
opener = build_opener(*handlers)
return opener
开发者ID:SerhatG,项目名称:nzbToMedia,代码行数:17,代码来源:client.py
示例10: do_call
def do_call(self, url, data=None, method=None,
calltimeout=constants.SOCKET_TIMEOUT):
"""Send requests to server.
Send HTTPS call, get response in JSON.
Convert response into Python Object and return it.
"""
if self.url:
url = self.url + url
if "xx/sessions" not in url:
LOG.debug('Request URL: %(url)s\n'
'Call Method: %(method)s\n'
'Request Data: %(data)s\n',
{'url': url,
'method': method,
'data': data})
opener = urlreq.build_opener(urlreq.HTTPCookieProcessor(self.cookie))
urlreq.install_opener(opener)
result = None
try:
req = urlreq.Request(url, data, self.headers)
if method:
req.get_method = lambda: method
res_temp = urlreq.urlopen(req, timeout=calltimeout)
res = res_temp.read().decode("utf-8")
LOG.debug('Response Data: %(res)s.', {'res': res})
except Exception as err:
LOG.error(_LE('\nBad response from server: %(url)s.'
' Error: %(err)s'), {'url': url, 'err': err})
res = '{"error":{"code":%s,' \
'"description":"Connect server error"}}' \
% constants.ERROR_CONNECT_TO_SERVER
try:
result = jsonutils.loads(res)
except Exception as err:
err_msg = (_('JSON transfer error: %s.') % err)
LOG.error(err_msg)
raise exception.InvalidInput(reason=err_msg)
return result
开发者ID:huaweistorage,项目名称:OpenStack_Driver,代码行数:44,代码来源:helper.py
示例11: prepare
def prepare(self):
"""
Read options for uploading, check that they're sane
"""
super(BlazeMeterUploader, self).prepare()
self.client.address = self.settings.get("address", self.client.address)
self.client.data_address = self.settings.get("data-address", self.client.data_address)
self.client.timeout = dehumanize_time(self.settings.get("timeout", self.client.timeout))
self.send_interval = dehumanize_time(self.settings.get("send-interval", self.send_interval))
self.browser_open = self.settings.get("browser-open", self.browser_open)
token = self.settings.get("token", "")
proxy_settings = self.engine.config.get("settings").get("proxy")
if proxy_settings:
if proxy_settings.get("address"):
proxy_url = urlsplit(proxy_settings.get("address"))
username = proxy_settings.get("username")
pwd = proxy_settings.get("password")
if username and pwd:
proxy_uri = "%s://%s:%[email protected]%s" % (proxy_url.scheme, username, pwd, proxy_url.netloc)
else:
proxy_uri = "%s://%s" % (proxy_url.scheme, proxy_url.netloc)
proxy_handler = ProxyHandler({"https": proxy_uri, "http": proxy_uri})
opener = build_opener(proxy_handler)
install_opener(opener)
if not token:
self.log.warning("No BlazeMeter API key provided, will upload anonymously")
self.client.token = token
self.client.active_session_id = self.parameters.get("session-id", None)
self.client.test_id = self.parameters.get("test-id", None)
self.client.user_id = self.parameters.get("user-id", None)
self.client.data_signature = self.parameters.get("signature", None)
if not self.client.test_id:
test_name = self.parameters.get("test", "Taurus Test") # TODO: provide a way to put datetime into test name
try:
self.client.ping() # to check connectivity and auth
if token:
self.test_id = self.client.test_by_name(test_name, {"type": "external"})
except HTTPError:
self.log.error("Cannot reach online results storage, maybe the address/token is wrong")
raise
开发者ID:Yingmin-Li,项目名称:taurus,代码行数:43,代码来源:blazemeter.py
示例12: _get_initial_token
def _get_initial_token(url):
"""
Create initial connection to get authentication token for future
requests.
Returns a string to be used in subsequent connections with the
X-CSRFToken header or the empty string if we didn't find any token in
the cookies.
"""
cookiejar = CookieJar()
opener = build_opener(HTTPCookieProcessor(cookiejar))
install_opener(opener)
opener.open(url)
for cookie in cookiejar:
if cookie.name == 'csrftoken':
return cookie.value
return ''
开发者ID:YoungGer,项目名称:edx-downloader,代码行数:19,代码来源:edx_dl.py
示例13: __init__
def __init__(self, go_server, path=''):
self.go_server = go_server
self.logger = logging.getLogger(__name__+'.URL')
self.contents = []
full_url = go_server.url + path
if self.go_server.user and self.go_server.password:
self.logger.debug("logging in as %s"%self.go_server.user)
passman = HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, full_url,
self.go_server.user,
self.go_server.password)
install_opener(build_opener(HTTPBasicAuthHandler(passman)))
self.logger.debug("reading url %s"%full_url)
file_handle = urlopen(full_url)
if six.PY2:
for line in file_handle:
self.contents.append(line)
else:
for line in file_handle:
self.contents.append(line.decode())
self.logger.debug('line count: %d'%len(self.contents))
file_handle.close()
path_parts = urlsplit(full_url).path.split('/')
last = path_parts[-1]
# /path/to/something/
if last is '':
path_parts.pop()
last = path_parts[-1]
self.path_parts = path_parts
self.file_name = last
self.file_path = '/'.join(path_parts[0:-1])
开发者ID:DataTacticsCorp,项目名称:gocd-parser,代码行数:41,代码来源:url.py
示例14: _get_initial_token
def _get_initial_token(url):
"""
Create initial connection to get authentication token for future
requests.
Returns a string to be used in subsequent connections with the
X-CSRFToken header or the empty string if we didn't find any token in
the cookies.
"""
logging.info("Getting initial CSRF token.")
cookiejar = CookieJar()
opener = build_opener(HTTPCookieProcessor(cookiejar))
install_opener(opener)
opener.open(url)
for cookie in cookiejar:
if cookie.name == "csrftoken":
logging.info("Found CSRF token.")
return cookie.value
logging.warn("Did not find the CSRF token.")
return ""
开发者ID:druvaciam,项目名称:edx-dl,代码行数:23,代码来源:edx_dl.py
示例15: call
def call(self, url, data=None, method=None):
"""Send requests to server.
Send HTTPS call, get response in JSON.
Convert response into Python Object and return it.
"""
if "xx/sessions" not in url:
LOG.debug('Request URL: %(url)s\n'
'Call Method: %(method)s\n'
'Request Data: %(data)s\n',
{'url': url,
'method': method,
'data': data})
opener = urlreq.build_opener(urlreq.HTTPCookieProcessor(self.cookie))
urlreq.install_opener(opener)
try:
req = urlreq.Request(url, data, self.headers)
if method:
req.get_method = lambda: method
res_temp = urlreq.urlopen(req, timeout=constants.SOCKET_TIMEOUT)
res = res_temp.read().decode("utf-8")
LOG.debug('Response Data: %(res)s.', {'res': res})
except Exception as err:
LOG.error(_LE('Bad response from server: %s.') % err)
raise err
try:
res_json = jsonutils.loads(res)
except Exception as err:
err_msg = (_('JSON transfer error: %s.') % err)
LOG.error(err_msg)
raise exception.InvalidShare(reason=err_msg)
return res_json
开发者ID:huaweistorage,项目名称:OpenStack_Driver,代码行数:37,代码来源:huawei_helper.py
示例16: do_call
def do_call(self, url, data=None, method=None):
"""Send requests to server.
Send HTTPS call, get response in JSON.
Convert response into Python Object and return it.
"""
if "xx/sessions" not in url:
LOG.debug(
"Request URL: %(url)s\n" "Call Method: %(method)s\n" "Request Data: %(data)s\n",
{"url": url, "method": method, "data": data},
)
opener = urlreq.build_opener(urlreq.HTTPCookieProcessor(self.cookie))
urlreq.install_opener(opener)
result = None
try:
req = urlreq.Request(url, data, self.headers)
if method:
req.get_method = lambda: method
res_temp = urlreq.urlopen(req, timeout=constants.SOCKET_TIMEOUT)
res = res_temp.read().decode("utf-8")
LOG.debug("Response Data: %(res)s.", {"res": res})
except Exception as err:
LOG.error(_LE("\nBad response from server: %(url)s." " Error: %(err)s"), {"url": url, "err": err})
res = '{"error":{"code":%s,' '"description":"Connect server error"}}' % constants.ERROR_CONNECT_TO_SERVER
try:
result = jsonutils.loads(res)
except Exception as err:
err_msg = _("JSON transfer error: %s.") % err
LOG.error(err_msg)
raise exception.InvalidInput(reason=err_msg)
return result
开发者ID:alinbalutoiu,项目名称:manila,代码行数:36,代码来源:helper.py
示例17: unload
def unload(buildout=None):
urllib_request.build_opener = original_build_opener
urllib_request.install_opener(urllib_request.build_opener())
开发者ID:mfelsche,项目名称:lovely.buildouthttp,代码行数:3,代码来源:buildouthttp.py
示例18: __init__
#.........这里部分代码省略.........
url += '/'
self.url = url + 'api/'
self.save_cookies = save_cookies
if self.save_cookies:
self.cookie_jar, self.cookie_file = create_cookie_jar(
cookie_file=cookie_file)
try:
self.cookie_jar.load(ignore_expires=True)
except IOError:
pass
else:
self.cookie_jar = CookieJar()
self.cookie_file = None
# Get the cookie domain from the url. If the domain
# does not contain a '.' (e.g. 'localhost'), we assume
# it is a local domain and suffix it (See RFC 2109).
parsed_url = urlparse(url)
self.domain = parsed_url[1].partition(':')[0] # Remove Port.
if self.domain.count('.') < 1:
self.domain = '%s.local' % self.domain
if session:
cookie = Cookie(
version=0,
name=RB_COOKIE_NAME,
value=session,
port=None,
port_specified=False,
domain=self.domain,
domain_specified=True,
domain_initial_dot=True,
path=parsed_url[2],
path_specified=True,
secure=False,
expires=None,
discard=False,
comment=None,
comment_url=None,
rest={'HttpOnly': None})
self.cookie_jar.set_cookie(cookie)
if self.save_cookies:
self.cookie_jar.save()
if username:
# If the username parameter is given, we have to clear the session
# cookie manually or it will override the username:password
# combination retrieved from the authentication callback.
try:
self.cookie_jar.clear(self.domain, parsed_url[2],
RB_COOKIE_NAME)
except KeyError:
pass
# Set up the HTTP libraries to support all of the features we need.
password_mgr = ReviewBoardHTTPPasswordMgr(self.url,
username,
password,
api_token,
auth_callback,
otp_token_callback)
self.preset_auth_handler = PresetHTTPAuthHandler(self.url,
password_mgr)
handlers = []
if not verify_ssl:
context = ssl._create_unverified_context()
handlers.append(HTTPSHandler(context=context))
if disable_proxy:
handlers.append(ProxyHandler({}))
handlers += [
HTTPCookieProcessor(self.cookie_jar),
ReviewBoardHTTPBasicAuthHandler(password_mgr),
HTTPDigestAuthHandler(password_mgr),
self.preset_auth_handler,
ReviewBoardHTTPErrorProcessor(),
]
if agent:
self.agent = agent
else:
self.agent = ('RBTools/' + get_package_version()).encode('utf-8')
opener = build_opener(*handlers)
opener.addheaders = [
(b'User-agent', self.agent),
]
install_opener(opener)
self._cache = None
self._urlopen = urlopen
开发者ID:acres-com-au,项目名称:rbtools,代码行数:101,代码来源:request.py
示例19: install
def install(buildout=None, pwd_path=None):
pwdsf = StringIO()
combined_creds = []
github_creds = None
creds = []
local_pwd_path = ''
github_repos = None
if buildout is not None:
local_pwd_path = os.path.join(
buildout['buildout']['directory'],
'.httpauth')
if 'github-repos' in buildout['buildout']:
github_repos = buildout['buildout']['github-repos'].split('\n')
system_pwd_path = os.path.join(
os.path.expanduser('~'),
'.buildout',
'.httpauth')
def combine_cred_file(file_path, combined_creds):
if file_path is None or not os.path.exists(file_path):
return
cred_file = open(file_path)
combined_creds += [l.strip()
for l in cred_file.readlines() if l.strip()]
cred_file.close()
# combine all the possible .httpauth files together
combine_cred_file(pwd_path, combined_creds)
combine_cred_file(local_pwd_path, combined_creds)
combine_cred_file(system_pwd_path, combined_creds)
pwdsf_len = pwdsf.write(u"\n".join(combined_creds))
pwdsf.seek(0)
if not pwdsf_len:
pwdsf = None
log.warn('Could not load authentication information')
try:
auth_handler = CredHandler()
github_creds = get_github_credentials()
new_handlers = []
if github_creds:
new_handlers.append(GithubHandler(github_creds, github_repos))
if pwdsf:
for l, row in enumerate(csv.reader(pwdsf)):
if len(row) == 3:
realm, uris, user = (el.strip() for el in row)
password = prompt_passwd(realm, user)
elif len(row) == 4:
realm, uris, user, password = (el.strip() for el in row)
else:
raise RuntimeError(
"Authentication file cannot be parsed %s:%s" % (
pwd_path, l + 1))
creds.append((realm, uris, user, password))
log.debug('Added credentials %r, %r' % (realm, uris))
auth_handler.add_password(realm, uris, user, password)
if creds:
new_handlers.append(auth_handler)
if creds or github_creds:
download.url_opener = URLOpener(creds, github_creds, github_repos)
if new_handlers:
if url_opener is not None:
handlers = url_opener.handlers[:]
handlers[:0] = new_handlers
else:
handlers = new_handlers
urllib_request.build_opener = lambda *a: original_build_opener(*handlers)
urllib_request.install_opener(urllib_request.build_opener())
finally:
if pwdsf:
pwdsf.close()
开发者ID:mfelsche,项目名称:lovely.buildouthttp,代码行数:69,代码来源:buildouthttp.py
示例20: setUp
def setUp(self):
""" Set up the mock objects to do our unit tests """
my_opener = urllib2.build_opener(MyHTTPHandler)
urllib2.install_opener(my_opener)
self.p = dlipower.PowerSwitch(hostname='lpc.digital-loggers.com')
开发者ID:aacero,项目名称:python-dlipower,代码行数:5,代码来源:test_dlipower.py
注:本文中的six.moves.urllib.request.install_opener函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论