本文整理汇总了Python中six.moves.urllib.request.urlopen函数的典型用法代码示例。如果您正苦于以下问题:Python urlopen函数的具体用法?Python urlopen怎么用?Python urlopen使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlopen函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _download_jar
def _download_jar(version='5.7.0'):
from six.moves.urllib.request import urlopen
import hashlib
for loc in _gen_jar_locations():
# check if dir exists and has write access:
if os.path.exists(loc) and os.access(loc, os.W_OK):
break
# if directory is pims and it does not exist, so make it (if allowed)
if os.path.basename(loc) == 'pims' and \
os.access(os.path.dirname(loc), os.W_OK):
os.mkdir(loc)
break
else:
raise IOError('No writeable location found. In order to use the '
'Bioformats reader, please download '
'loci_tools.jar to the pims program folder or one of '
'the locations provided by _gen_jar_locations().')
url = ('http://downloads.openmicroscopy.org/bio-formats/' + version +
'/artifacts/loci_tools.jar')
path = os.path.join(loc, 'loci_tools.jar')
loci_tools = urlopen(url).read()
sha1_checksum = urlopen(url + '.sha1').read().split(b' ')[0].decode()
downloaded = hashlib.sha1(loci_tools).hexdigest()
if downloaded != sha1_checksum:
raise IOError("Downloaded loci_tools.jar has invalid checksum. "
"Please try again.")
with open(path, 'wb') as output:
output.write(loci_tools)
return path
开发者ID:nkeim,项目名称:pims,代码行数:35,代码来源:bioformats.py
示例2: test_hot_template
def test_hot_template(self):
self.m.StubOutWithMock(request, "urlopen")
tmpl_file = "/home/my/dir/template.yaml"
url = "file:///home/my/dir/template.yaml"
request.urlopen("file:///home/my/dir/foo.yaml").InAnyOrder().AndReturn(six.BytesIO(self.foo_template))
request.urlopen("file:///home/my/dir/foo.yaml").InAnyOrder().AndReturn(six.BytesIO(self.foo_template))
request.urlopen(url).InAnyOrder().AndReturn(six.BytesIO(self.hot_template))
request.urlopen("file:///home/my/dir/spam/egg.yaml").InAnyOrder().AndReturn(six.BytesIO(self.egg_template))
request.urlopen("file:///home/my/dir/spam/egg.yaml").InAnyOrder().AndReturn(six.BytesIO(self.egg_template))
self.m.ReplayAll()
files, tmpl_parsed = template_utils.get_template_contents(template_file=tmpl_file)
self.assertEqual(
yaml.load(self.foo_template.decode("utf-8")), json.loads(files.get("file:///home/my/dir/foo.yaml"))
)
self.assertEqual(
yaml.load(self.egg_template.decode("utf-8")), json.loads(files.get("file:///home/my/dir/spam/egg.yaml"))
)
self.assertEqual(
{
u"heat_template_version": u"2013-05-23",
u"parameters": {u"param1": {u"type": u"string"}},
u"resources": {
u"resource1": {u"type": u"file:///home/my/dir/foo.yaml", u"properties": {u"foo": u"bar"}},
u"resource2": {
u"type": u"OS::Heat::ResourceGroup",
u"properties": {u"resource_def": {u"type": u"file:///home/my/dir/spam/egg.yaml"}},
},
},
},
tmpl_parsed,
)
开发者ID:neerja28,项目名称:python-heatclient,代码行数:34,代码来源:test_template_utils.py
示例3: site_reachable
def site_reachable(url, timeout=3):
"""Checks if the given URL is accessible."""
try:
urlopen(url, timeout=timeout)
except (URLError, HTTPError):
return False
return True
开发者ID:kbg,项目名称:drms_json,代码行数:7,代码来源:conftest.py
示例4: send
def send(self, msg, *args, **kwargs):
config = self.config
if config.additional_params:
params = config.additional_params.copy()
else:
params = {}
phone_number = msg.phone_number
if config.include_plus:
phone_number = clean_phone_number(phone_number)
else:
phone_number = strip_plus(phone_number)
try:
text = msg.text.encode("iso-8859-1")
except UnicodeEncodeError:
text = msg.text.encode("utf-8")
params[config.message_param] = text
params[config.number_param] = phone_number
url_params = urlencode(params)
try:
if config.method == "GET":
response = urlopen("%s?%s" % (config.url, url_params),
timeout=settings.SMS_GATEWAY_TIMEOUT).read()
else:
response = urlopen(config.url, url_params,
timeout=settings.SMS_GATEWAY_TIMEOUT).read()
except Exception as e:
msg = "Error sending message from backend: '{}'\n\n{}".format(self.pk, str(e))
six.reraise(BackendProcessingException(msg), None, sys.exc_info()[2])
开发者ID:dimagi,项目名称:commcare-hq,代码行数:31,代码来源:models.py
示例5: check_non_utf8_content
def check_non_utf8_content(self, filename, content):
base_url = "file:///tmp"
url = "%s/%s" % (base_url, filename)
template = {
"resources": {
"one_init": {
"type": "OS::Heat::CloudConfig",
"properties": {
"cloud_config": {
"write_files": [
{"path": "/tmp/%s" % filename, "content": {"get_file": url}, "encoding": "b64"}
]
}
},
}
}
}
self.m.StubOutWithMock(request, "urlopen")
raw_content = base64.decodestring(content)
response = six.BytesIO(raw_content)
request.urlopen(url).AndReturn(response)
self.m.ReplayAll()
files = {}
template_utils.resolve_template_get_files(template, files, base_url)
self.assertEqual({url: content}, files)
开发者ID:neerja28,项目名称:python-heatclient,代码行数:25,代码来源:test_template_utils.py
示例6: test_path
def test_path(self):
urlopen(self.server.get_url("/foo")).read()
self.assertEqual(self.server.request["path"], "/foo")
urlopen(self.server.get_url("/foo?bar=1")).read()
self.assertEqual(self.server.request["path"], "/foo")
self.assertEqual(self.server.request["args"]["bar"], "1")
开发者ID:pombredanne,项目名称:test_server,代码行数:7,代码来源:server.py
示例7: test_default_set_cassette_library_dir
def test_default_set_cassette_library_dir(tmpdir):
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
with my_vcr.use_cassette('test.json'):
urlopen('http://httpbin.org/get')
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
开发者ID:IvanMalison,项目名称:vcrpy,代码行数:7,代码来源:test_config.py
示例8: _bioinformatics
def _bioinformatics(url):
try:
html = urlopen(url)
except HTTPError as e:
print(e)
except URLError as e:
print("The server could not be found!")
bsObj = BeautifulSoup(html.read(), "html5lib")
meta_data = ' '.join(bsObj.find("cite")\
.get_text().replace("\n", "").split())
journal_data = []
original = bsObj.find("div", {"class": "pub-section-ORIGINALPAPERS"})
for subject in original.findAll("div", {"class": "level2"}):
category = subject.find("h3").get_text()
articles = []
for article in subject.findAll("li", {"class": "cit"}):
t = article.find("h4").get_text()
abst_url = '/'.join(url.split('/')[:3]) \
+ article.find("a", {"rel": "abstract"})['href']
try:
#abst_html = urllib.request.urlopen(abst_url)
abst_html = urlopen(abst_url)
abstObj = BeautifulSoup(abst_html.read(), "html5lib")
abst_contents = abstObj.find("div", {"id": "abstract-1"})
results = abst_contents.findAll("p")[1].get_text()
results = results.split(':')[1]
s = ' '.join(results.replace("\n", " ").split())
articles.append((t, s))
except AttirbuteError:
continue
if len(articles) > 0:
journal_data.append((category, articles))
return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:33,代码来源:oxford.py
示例9: __init__
def __init__(self, url, method='GET', data=None, **kwargs):
"""
:param url: the base URL to open.
:param method: the HTTP method to use.
Optional: defaults to 'GET'
:param data: any data to pass (either in query for 'GET'
or as post data with 'POST')
:type data: dict
"""
self.url = url
self.method = method
self.data = data or {}
encoded_data = urlencode(self.data)
scheme = urlparse(url).scheme
assert scheme in ('http', 'https', 'ftp'), 'Invalid URL scheme: %s' % scheme
if method == 'GET':
self.url += '?%s' % (encoded_data)
opened = urlopen(url)
elif method == 'POST':
opened = urlopen(url, encoded_data)
else:
raise ValueError('Not a valid method: %s' % (method))
super(URLDataProvider, self).__init__(opened, **kwargs)
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:27,代码来源:external.py
示例10: send
def send(t, args):
name = args.program
base_url = args.web
auth = args.web_auth
data = {
"profiles": t.get_tree().flatten()._serialize(),
"argv": "%s %s" % (name, " ".join(args.args)),
"version": 1,
}
data = json.dumps(data).encode('utf-8')
# XXX http only for now
if base_url.startswith("http"):
url = '%s/api/log/' % base_url.rstrip("/")
else:
url = 'http://%s/api/log/' % base_url.rstrip("/")
headers = {'content-type': 'application/json'}
if auth:
headers['AUTHORIZATION'] = "Token %s" % auth
req = request.Request(url, data, headers)
request.urlopen(req)
开发者ID:oberstet,项目名称:vmprof-python,代码行数:29,代码来源:com.py
示例11: get_google_result
def get_google_result(search_keywords):
if search_keywords == 'help':
help_message = "To use this bot start message with @google \
followed by what you want to search for. If \
found, Zulip will return the first search result \
on Google. An example message that could be sent is:\
'@google zulip' or '@google how to create a chatbot'."
return help_message
else:
try:
urls = search(search_keywords, stop=20)
urlopen('http://216.58.192.142', timeout=1)
except http.client.RemoteDisconnected as er:
logging.exception(er)
return 'Error: No internet connection. {}.'.format(er)
except Exception as e:
logging.exception(e)
return 'Error: Search failed. {}.'.format(e)
if not urls:
return 'No URLs returned by google.'
url = next(urls)
return 'Success: {}'.format(url)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:25,代码来源:googlesearch.py
示例12: test_none_record_mode
def test_none_record_mode(tmpdir):
# Cassette file doesn't exist, yet we are trying to make a request.
# raise hell.
testfile = str(tmpdir.join("recordmode.yml"))
with vcr.use_cassette(testfile, record_mode="none"):
with pytest.raises(Exception):
urlopen("http://httpbin.org/").read()
开发者ID:koobs,项目名称:vcrpy,代码行数:7,代码来源:test_record_mode.py
示例13: _sendNMJ
def _sendNMJ(self, host):
"""
Send a NMJ update command to the specified machine
host: The hostname/IP to send the request to (no port)
database: The database to send the request to
mount: The mount URL to use (optional)
return: True if the request succeeded, False otherwise
"""
# if a host is provided then attempt to open a handle to that URL
try:
url_scandir = 'http://' + host + ':8008/metadata_database?arg0=update_scandir&arg1=' + app.NMJv2_DATABASE + '&arg2=&arg3=update_all'
log.debug(u'NMJ scan update command sent to host: {0}', host)
url_updatedb = 'http://' + host + ':8008/metadata_database?arg0=scanner_start&arg1=' + app.NMJv2_DATABASE + '&arg2=background&arg3='
log.debug(u'Try to mount network drive via url: {0}', host)
prereq = Request(url_scandir)
req = Request(url_updatedb)
handle1 = urlopen(prereq)
response1 = handle1.read()
time.sleep(0.3)
handle2 = urlopen(req)
response2 = handle2.read()
except IOError as error:
log.warning(u'Warning: Unable to contact popcorn hour on host {0}: {1}', host, error)
return False
try:
et = etree.fromstring(response1)
result1 = et.findtext('returnValue')
except SyntaxError as error:
log.error(u'Unable to parse XML returned from the Popcorn Hour: update_scandir, {0}', error)
return False
try:
et = etree.fromstring(response2)
result2 = et.findtext('returnValue')
except SyntaxError as error:
log.error(u'Unable to parse XML returned from the Popcorn Hour: scanner_start, {0}', error)
return False
# if the result was a number then consider that an error
error_codes = ['8', '11', '22', '49', '50', '51', '60']
error_messages = ['Invalid parameter(s)/argument(s)',
'Invalid database path',
'Insufficient size',
'Database write error',
'Database read error',
'Open fifo pipe failed',
'Read only file system']
if int(result1) > 0:
index = error_codes.index(result1)
log.error(u'Popcorn Hour returned an error: {0}', error_messages[index])
return False
else:
if int(result2) > 0:
index = error_codes.index(result2)
log.error(u'Popcorn Hour returned an error: {0}', error_messages[index])
return False
else:
log.info(u'NMJv2 started background scan')
return True
开发者ID:pymedusa,项目名称:SickRage,代码行数:60,代码来源:nmjv2.py
示例14: _genomeresearch
def _genomeresearch(url):
try:
html = urlopen(url)
except HTTPError as e:
print(e)
except URLError as e:
print("The server could not be found!")
bsObj = BeautifulSoup(html.read(), "html5lib")
meta_data = bsObj.find("cite").get_text()
journal_data = []
level1 = bsObj.find("div", {"class": "level1"})
for subject in bsObj.findAll("div", {"class": "level1"}):
category = subject.find("h2").get_text()
articles = []
for article in subject.findAll("li", {"class": "toc-cit"}):
t = article.find("h4").get_text().strip()
abst_url = '/'.join(url.split('/')[:3]) \
+ article.find("a", {"rel": "abstract"})['href']
try:
abst_html = urlopen(abst_url)
abstObj = BeautifulSoup(abst_html.read(), "html5lib")
abst_contents = abstObj.find("div", {"id": "abstract-1"})
results = abst_contents.find("p").get_text()
#results = results.split(':')[1]
s = ' '.join(results.replace("\n", " ").split())
articles.append((t, s))
except AttirbuteError:
continue
if len(articles) > 0:
journal_data.append((category, articles))
return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:31,代码来源:csh.py
示例15: test_once_mode_three_times
def test_once_mode_three_times(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="once"):
# get three of the same file
response1 = urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
开发者ID:darioush,项目名称:vcrpy,代码行数:7,代码来源:test_record_mode.py
示例16: test_new_episodes_record_mode
def test_new_episodes_record_mode(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="new_episodes"):
# cassette file doesn't exist, so create.
response = urlopen('http://httpbin.org/').read()
with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
# make the same request again
response = urlopen('http://httpbin.org/').read()
# all responses have been played
assert cass.all_played
# in the "new_episodes" record mode, we can add more requests to
# a cassette without repurcussions.
response = urlopen('http://httpbin.org/get').read()
# one of the responses has been played
assert cass.play_count == 1
# not all responses have been played
assert not cass.all_played
with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
# the cassette should now have 2 responses
assert len(cass.responses) == 2
开发者ID:darioush,项目名称:vcrpy,代码行数:27,代码来源:test_record_mode.py
示例17: test_default_matcher_matches
def test_default_matcher_matches(cassette, uri, httpbin, httpbin_secure):
uri = _replace_httpbin(uri, httpbin, httpbin_secure)
with vcr.use_cassette(cassette) as cass:
urlopen(uri)
assert cass.play_count == 1
开发者ID:JanLikar,项目名称:vcrpy,代码行数:7,代码来源:test_matchers.py
示例18: grab_url_cached
def grab_url_cached(url):
"""
Download a possibly cached URL.
:returns: The contents of the page.
"""
cache_dir = get_cache_dir('urls')
h = hashlib.sha1()
h.update(url.encode('UTF-8'))
F = os.path.join(cache_dir, h.hexdigest())
if os.path.exists(F) and (time.time() - os.path.getmtime(F)) < CACHE_LIFE:
with open(F) as file:
page = file.read()
else:
# try the remote supplier page cache
try:
base_url = "https://www.studentrobotics.org/~rspanton/supcache/{}"
cached_url = base_url.format(h.hexdigest())
sc = urlopen(cached_url)
page = sc.read()
except HTTPError:
page = urlopen(url).read()
with open(F, 'wb') as file:
file.write(page)
return page
开发者ID:PeterJCLaw,项目名称:tools,代码行数:30,代码来源:cachedfetch.py
示例19: _genomebiology
def _genomebiology(url):
try:
html = urlopen(url)
except HTTPError as e:
print(e)
except URLError as e:
print("The server could not be found!")
bsObj = BeautifulSoup(html.read(), "html5lib")
meta_data = bsObj.find("p", {"class": "ResultsList_journal"})\
.get_text().strip().split(":")[0]
journal_data = defaultdict(list)
for subject in bsObj.findAll("li", {"class":"ResultsList_item"}):
category = subject.find("p", {"class":"ResultsList_type"}).get_text()
t = subject.find("a",{"class":"fulltexttitle"}).get_text()
abst_url = '/'.join(url.split('/')[:3]) \
+ subject.find("a",{"class":"fulltexttitle"})['href']
if category == "Research Highlight":
continue
try:
abst_html = urlopen(abst_url)
abstObj = BeautifulSoup(abst_html.read(), "html5lib")
s = abstObj.find("section").findAll("p", {"class":"Para"})[-1]\
.get_text()
journal_data[category].append((t, s))
except AttributeError:
continue
except IndexError:
continue
journal_data = [(a, b) for a, b in journal_data.items()]
return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:33,代码来源:biomedcentral.py
示例20: handleSection
def handleSection(self, section, items):
locales = items['locales']
if locales == 'all':
inipath = '/'.join((
items['repo'], items['mozilla'],
'raw-file', 'default',
items['l10n.ini']
))
ini = ConfigParser()
ini.readfp(urlopen(inipath))
allpath = urljoin(
urljoin(inipath, ini.get('general', 'depth')),
ini.get('general', 'all'))
locales = urlopen(allpath).read()
locales = locales.split()
obs = (Active.objects
.filter(run__tree__code=section)
.exclude(run__locale__code__in=locales)
.order_by('run__locale__code'))
obslocs = ' '.join(obs.values_list('run__locale__code', flat=True))
if not obslocs:
self.stdout.write(' OK\n')
return
s = input('Remove %s? [Y/n] ' % obslocs)
if s.lower() == 'y' or s == '':
obs.delete()
开发者ID:Pike,项目名称:elmo,代码行数:26,代码来源:deactivate.py
注:本文中的six.moves.urllib.request.urlopen函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论