• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python request.urlopen函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib.request.urlopen函数的典型用法代码示例。如果您正苦于以下问题:Python urlopen函数的具体用法?Python urlopen怎么用?Python urlopen使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlopen函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _download_jar

def _download_jar(version='5.7.0'):
    from six.moves.urllib.request import urlopen
    import hashlib

    for loc in _gen_jar_locations():
        # check if dir exists and has write access:
        if os.path.exists(loc) and os.access(loc, os.W_OK):
            break
        # if directory is pims and it does not exist, so make it (if allowed)
        if os.path.basename(loc) == 'pims' and \
           os.access(os.path.dirname(loc), os.W_OK):
            os.mkdir(loc)
            break
    else:
        raise IOError('No writeable location found. In order to use the '
                      'Bioformats reader, please download '
                      'loci_tools.jar to the pims program folder or one of '
                      'the locations provided by _gen_jar_locations().')

    url = ('http://downloads.openmicroscopy.org/bio-formats/' + version +
           '/artifacts/loci_tools.jar')

    path = os.path.join(loc, 'loci_tools.jar')
    loci_tools = urlopen(url).read()
    sha1_checksum = urlopen(url + '.sha1').read().split(b' ')[0].decode()

    downloaded = hashlib.sha1(loci_tools).hexdigest()
    if downloaded != sha1_checksum:
        raise IOError("Downloaded loci_tools.jar has invalid checksum. "
                      "Please try again.")

    with open(path, 'wb') as output:
        output.write(loci_tools)

    return path
开发者ID:nkeim,项目名称:pims,代码行数:35,代码来源:bioformats.py


示例2: test_hot_template

    def test_hot_template(self):
        self.m.StubOutWithMock(request, "urlopen")
        tmpl_file = "/home/my/dir/template.yaml"
        url = "file:///home/my/dir/template.yaml"
        request.urlopen("file:///home/my/dir/foo.yaml").InAnyOrder().AndReturn(six.BytesIO(self.foo_template))
        request.urlopen("file:///home/my/dir/foo.yaml").InAnyOrder().AndReturn(six.BytesIO(self.foo_template))
        request.urlopen(url).InAnyOrder().AndReturn(six.BytesIO(self.hot_template))
        request.urlopen("file:///home/my/dir/spam/egg.yaml").InAnyOrder().AndReturn(six.BytesIO(self.egg_template))
        request.urlopen("file:///home/my/dir/spam/egg.yaml").InAnyOrder().AndReturn(six.BytesIO(self.egg_template))
        self.m.ReplayAll()

        files, tmpl_parsed = template_utils.get_template_contents(template_file=tmpl_file)

        self.assertEqual(
            yaml.load(self.foo_template.decode("utf-8")), json.loads(files.get("file:///home/my/dir/foo.yaml"))
        )
        self.assertEqual(
            yaml.load(self.egg_template.decode("utf-8")), json.loads(files.get("file:///home/my/dir/spam/egg.yaml"))
        )

        self.assertEqual(
            {
                u"heat_template_version": u"2013-05-23",
                u"parameters": {u"param1": {u"type": u"string"}},
                u"resources": {
                    u"resource1": {u"type": u"file:///home/my/dir/foo.yaml", u"properties": {u"foo": u"bar"}},
                    u"resource2": {
                        u"type": u"OS::Heat::ResourceGroup",
                        u"properties": {u"resource_def": {u"type": u"file:///home/my/dir/spam/egg.yaml"}},
                    },
                },
            },
            tmpl_parsed,
        )
开发者ID:neerja28,项目名称:python-heatclient,代码行数:34,代码来源:test_template_utils.py


示例3: site_reachable

def site_reachable(url, timeout=3):
    """Checks if the given URL is accessible."""
    try:
        urlopen(url, timeout=timeout)
    except (URLError, HTTPError):
        return False
    return True
开发者ID:kbg,项目名称:drms_json,代码行数:7,代码来源:conftest.py


示例4: send

    def send(self, msg, *args, **kwargs):
        config = self.config
        if config.additional_params:
            params = config.additional_params.copy()
        else:
            params = {}

        phone_number = msg.phone_number
        if config.include_plus:
            phone_number = clean_phone_number(phone_number)
        else:
            phone_number = strip_plus(phone_number)

        try:
            text = msg.text.encode("iso-8859-1")
        except UnicodeEncodeError:
            text = msg.text.encode("utf-8")
        params[config.message_param] = text
        params[config.number_param] = phone_number

        url_params = urlencode(params)
        try:
            if config.method == "GET":
                response = urlopen("%s?%s" % (config.url, url_params),
                    timeout=settings.SMS_GATEWAY_TIMEOUT).read()
            else:
                response = urlopen(config.url, url_params,
                    timeout=settings.SMS_GATEWAY_TIMEOUT).read()
        except Exception as e:
            msg = "Error sending message from backend: '{}'\n\n{}".format(self.pk, str(e))
            six.reraise(BackendProcessingException(msg), None, sys.exc_info()[2])
开发者ID:dimagi,项目名称:commcare-hq,代码行数:31,代码来源:models.py


示例5: check_non_utf8_content

 def check_non_utf8_content(self, filename, content):
     base_url = "file:///tmp"
     url = "%s/%s" % (base_url, filename)
     template = {
         "resources": {
             "one_init": {
                 "type": "OS::Heat::CloudConfig",
                 "properties": {
                     "cloud_config": {
                         "write_files": [
                             {"path": "/tmp/%s" % filename, "content": {"get_file": url}, "encoding": "b64"}
                         ]
                     }
                 },
             }
         }
     }
     self.m.StubOutWithMock(request, "urlopen")
     raw_content = base64.decodestring(content)
     response = six.BytesIO(raw_content)
     request.urlopen(url).AndReturn(response)
     self.m.ReplayAll()
     files = {}
     template_utils.resolve_template_get_files(template, files, base_url)
     self.assertEqual({url: content}, files)
开发者ID:neerja28,项目名称:python-heatclient,代码行数:25,代码来源:test_template_utils.py


示例6: test_path

    def test_path(self):
        urlopen(self.server.get_url("/foo")).read()
        self.assertEqual(self.server.request["path"], "/foo")

        urlopen(self.server.get_url("/foo?bar=1")).read()
        self.assertEqual(self.server.request["path"], "/foo")
        self.assertEqual(self.server.request["args"]["bar"], "1")
开发者ID:pombredanne,项目名称:test_server,代码行数:7,代码来源:server.py


示例7: test_default_set_cassette_library_dir

def test_default_set_cassette_library_dir(tmpdir):
    my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))

    with my_vcr.use_cassette('test.json'):
        urlopen('http://httpbin.org/get')

    assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
开发者ID:IvanMalison,项目名称:vcrpy,代码行数:7,代码来源:test_config.py


示例8: _bioinformatics

def _bioinformatics(url):
    try:
        html = urlopen(url)
    except HTTPError as e:
        print(e)
    except URLError as e:
        print("The server could not be found!")
    bsObj = BeautifulSoup(html.read(), "html5lib")
    meta_data = ' '.join(bsObj.find("cite")\
                        .get_text().replace("\n", "").split())
    journal_data = []
    original = bsObj.find("div", {"class": "pub-section-ORIGINALPAPERS"})
    for subject in original.findAll("div", {"class": "level2"}):
        category = subject.find("h3").get_text()
        articles = []
        for article in subject.findAll("li", {"class": "cit"}):
            t = article.find("h4").get_text()
            abst_url = '/'.join(url.split('/')[:3]) \
                    + article.find("a", {"rel": "abstract"})['href']
            try:
                #abst_html = urllib.request.urlopen(abst_url)
                abst_html = urlopen(abst_url)
                abstObj = BeautifulSoup(abst_html.read(), "html5lib")
                abst_contents = abstObj.find("div", {"id": "abstract-1"})
                results = abst_contents.findAll("p")[1].get_text()
                results = results.split(':')[1]
                s = ' '.join(results.replace("\n", " ").split())
                articles.append((t, s))
            except AttirbuteError:
                continue
        if len(articles) > 0:
            journal_data.append((category, articles))
    return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:33,代码来源:oxford.py


示例9: __init__

    def __init__(self, url, method='GET', data=None, **kwargs):
        """
        :param url: the base URL to open.
        :param method: the HTTP method to use.
            Optional: defaults to 'GET'
        :param data: any data to pass (either in query for 'GET'
            or as post data with 'POST')
        :type data: dict
        """
        self.url = url
        self.method = method

        self.data = data or {}
        encoded_data = urlencode(self.data)

        scheme = urlparse(url).scheme
        assert scheme in ('http', 'https', 'ftp'), 'Invalid URL scheme: %s' % scheme

        if method == 'GET':
            self.url += '?%s' % (encoded_data)
            opened = urlopen(url)
        elif method == 'POST':
            opened = urlopen(url, encoded_data)
        else:
            raise ValueError('Not a valid method: %s' % (method))

        super(URLDataProvider, self).__init__(opened, **kwargs)
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:27,代码来源:external.py


示例10: send

def send(t, args):

    name = args.program
    base_url = args.web
    auth = args.web_auth


    data = {
        "profiles": t.get_tree().flatten()._serialize(),
        "argv": "%s %s" % (name, " ".join(args.args)),
        "version": 1,
    }

    data = json.dumps(data).encode('utf-8')

    # XXX http only for now
    if base_url.startswith("http"):
        url = '%s/api/log/' % base_url.rstrip("/")
    else:
        url = 'http://%s/api/log/' % base_url.rstrip("/")

    headers = {'content-type': 'application/json'}

    if auth:
        headers['AUTHORIZATION'] = "Token %s" % auth

    req = request.Request(url, data, headers)

    request.urlopen(req)
开发者ID:oberstet,项目名称:vmprof-python,代码行数:29,代码来源:com.py


示例11: get_google_result

def get_google_result(search_keywords):
    if search_keywords == 'help':
        help_message = "To use this bot start message with @google \
                        followed by what you want to search for. If \
                        found, Zulip will return the first search result \
                        on Google. An example message that could be sent is:\
                        '@google zulip' or '@google how to create a chatbot'."
        return help_message
    else:
        try:
            urls = search(search_keywords, stop=20)
            urlopen('http://216.58.192.142', timeout=1)
        except http.client.RemoteDisconnected as er:
            logging.exception(er)
            return 'Error: No internet connection. {}.'.format(er)
        except Exception as e:
            logging.exception(e)
            return 'Error: Search failed. {}.'.format(e)

        if not urls:
            return 'No URLs returned by google.'

        url = next(urls)

        return 'Success: {}'.format(url)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:25,代码来源:googlesearch.py


示例12: test_none_record_mode

def test_none_record_mode(tmpdir):
    # Cassette file doesn't exist, yet we are trying to make a request.
    # raise hell.
    testfile = str(tmpdir.join("recordmode.yml"))
    with vcr.use_cassette(testfile, record_mode="none"):
        with pytest.raises(Exception):
            urlopen("http://httpbin.org/").read()
开发者ID:koobs,项目名称:vcrpy,代码行数:7,代码来源:test_record_mode.py


示例13: _sendNMJ

    def _sendNMJ(self, host):
        """
        Send a NMJ update command to the specified machine

        host: The hostname/IP to send the request to (no port)
        database: The database to send the request to
        mount: The mount URL to use (optional)

        return: True if the request succeeded, False otherwise
        """
        # if a host is provided then attempt to open a handle to that URL
        try:
            url_scandir = 'http://' + host + ':8008/metadata_database?arg0=update_scandir&arg1=' + app.NMJv2_DATABASE + '&arg2=&arg3=update_all'
            log.debug(u'NMJ scan update command sent to host: {0}', host)
            url_updatedb = 'http://' + host + ':8008/metadata_database?arg0=scanner_start&arg1=' + app.NMJv2_DATABASE + '&arg2=background&arg3='
            log.debug(u'Try to mount network drive via url: {0}', host)
            prereq = Request(url_scandir)
            req = Request(url_updatedb)
            handle1 = urlopen(prereq)
            response1 = handle1.read()
            time.sleep(0.3)
            handle2 = urlopen(req)
            response2 = handle2.read()
        except IOError as error:
            log.warning(u'Warning: Unable to contact popcorn hour on host {0}: {1}', host, error)
            return False
        try:
            et = etree.fromstring(response1)
            result1 = et.findtext('returnValue')
        except SyntaxError as error:
            log.error(u'Unable to parse XML returned from the Popcorn Hour: update_scandir, {0}', error)
            return False
        try:
            et = etree.fromstring(response2)
            result2 = et.findtext('returnValue')
        except SyntaxError as error:
            log.error(u'Unable to parse XML returned from the Popcorn Hour: scanner_start, {0}', error)
            return False

        # if the result was a number then consider that an error
        error_codes = ['8', '11', '22', '49', '50', '51', '60']
        error_messages = ['Invalid parameter(s)/argument(s)',
                          'Invalid database path',
                          'Insufficient size',
                          'Database write error',
                          'Database read error',
                          'Open fifo pipe failed',
                          'Read only file system']
        if int(result1) > 0:
            index = error_codes.index(result1)
            log.error(u'Popcorn Hour returned an error: {0}', error_messages[index])
            return False
        else:
            if int(result2) > 0:
                index = error_codes.index(result2)
                log.error(u'Popcorn Hour returned an error: {0}', error_messages[index])
                return False
            else:
                log.info(u'NMJv2 started background scan')
                return True
开发者ID:pymedusa,项目名称:SickRage,代码行数:60,代码来源:nmjv2.py


示例14: _genomeresearch

def _genomeresearch(url):
    try:
        html = urlopen(url)
    except HTTPError as e:
        print(e)
    except URLError as e:
        print("The server could not be found!")
    bsObj = BeautifulSoup(html.read(), "html5lib")
    meta_data = bsObj.find("cite").get_text()
    journal_data = []
    level1 = bsObj.find("div", {"class": "level1"})
    for subject in bsObj.findAll("div", {"class": "level1"}):
        category = subject.find("h2").get_text()
        articles = []
        for article in subject.findAll("li", {"class": "toc-cit"}):
            t = article.find("h4").get_text().strip()
            abst_url = '/'.join(url.split('/')[:3]) \
                    + article.find("a", {"rel": "abstract"})['href']
            try:
                abst_html = urlopen(abst_url)
                abstObj = BeautifulSoup(abst_html.read(), "html5lib")
                abst_contents = abstObj.find("div", {"id": "abstract-1"})
                results = abst_contents.find("p").get_text()
                #results = results.split(':')[1]
                s = ' '.join(results.replace("\n", " ").split())
                articles.append((t, s))
            except AttirbuteError:
                continue
        if len(articles) > 0:
            journal_data.append((category, articles))
    return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:31,代码来源:csh.py


示例15: test_once_mode_three_times

def test_once_mode_three_times(tmpdir):
    testfile = str(tmpdir.join('recordmode.yml'))
    with vcr.use_cassette(testfile, record_mode="once"):
        # get three of the same file
        response1 = urlopen('http://httpbin.org/').read()
        response2 = urlopen('http://httpbin.org/').read()
        response2 = urlopen('http://httpbin.org/').read()
开发者ID:darioush,项目名称:vcrpy,代码行数:7,代码来源:test_record_mode.py


示例16: test_new_episodes_record_mode

def test_new_episodes_record_mode(tmpdir):
    testfile = str(tmpdir.join('recordmode.yml'))

    with vcr.use_cassette(testfile, record_mode="new_episodes"):
        # cassette file doesn't exist, so create.
        response = urlopen('http://httpbin.org/').read()

    with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
        # make the same request again
        response = urlopen('http://httpbin.org/').read()

        # all responses have been played
        assert cass.all_played

        # in the "new_episodes" record mode, we can add more requests to
        # a cassette without repurcussions.
        response = urlopen('http://httpbin.org/get').read()

        # one of the responses has been played
        assert cass.play_count == 1

        # not all responses have been played
        assert not cass.all_played

    with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
        # the cassette should now have 2 responses
        assert len(cass.responses) == 2
开发者ID:darioush,项目名称:vcrpy,代码行数:27,代码来源:test_record_mode.py


示例17: test_default_matcher_matches

def test_default_matcher_matches(cassette, uri, httpbin, httpbin_secure):

    uri = _replace_httpbin(uri, httpbin, httpbin_secure)

    with vcr.use_cassette(cassette) as cass:
        urlopen(uri)
        assert cass.play_count == 1
开发者ID:JanLikar,项目名称:vcrpy,代码行数:7,代码来源:test_matchers.py


示例18: grab_url_cached

def grab_url_cached(url):
    """
    Download a possibly cached URL.

    :returns: The contents of the page.
    """
    cache_dir = get_cache_dir('urls')

    h = hashlib.sha1()
    h.update(url.encode('UTF-8'))

    F = os.path.join(cache_dir, h.hexdigest())

    if os.path.exists(F) and (time.time() - os.path.getmtime(F)) < CACHE_LIFE:
        with open(F) as file:
            page = file.read()
    else:
        # try the remote supplier page cache
        try:
            base_url = "https://www.studentrobotics.org/~rspanton/supcache/{}"
            cached_url = base_url.format(h.hexdigest())
            sc = urlopen(cached_url)
            page = sc.read()
        except HTTPError:
            page = urlopen(url).read()

        with open(F, 'wb') as file:
            file.write(page)

    return page
开发者ID:PeterJCLaw,项目名称:tools,代码行数:30,代码来源:cachedfetch.py


示例19: _genomebiology

def _genomebiology(url):
    try:
        html = urlopen(url)
    except HTTPError as e:
        print(e)
    except URLError as e:
        print("The server could not be found!")
    bsObj = BeautifulSoup(html.read(), "html5lib")
    meta_data = bsObj.find("p", {"class": "ResultsList_journal"})\
            .get_text().strip().split(":")[0]

    journal_data = defaultdict(list)

    for subject in bsObj.findAll("li", {"class":"ResultsList_item"}):
        category = subject.find("p", {"class":"ResultsList_type"}).get_text()
        t = subject.find("a",{"class":"fulltexttitle"}).get_text()
        abst_url = '/'.join(url.split('/')[:3]) \
                + subject.find("a",{"class":"fulltexttitle"})['href']
        if category == "Research Highlight":
            continue

        try:
            abst_html = urlopen(abst_url)
            abstObj = BeautifulSoup(abst_html.read(), "html5lib")
            s = abstObj.find("section").findAll("p", {"class":"Para"})[-1]\
                    .get_text()
            journal_data[category].append((t, s))
        except AttributeError:
            continue
        except IndexError:
            continue
    journal_data = [(a, b) for a, b in journal_data.items()]
    return meta_data, journal_data
开发者ID:mottodora,项目名称:journalClubIndexGenerator,代码行数:33,代码来源:biomedcentral.py


示例20: handleSection

 def handleSection(self, section, items):
     locales = items['locales']
     if locales == 'all':
         inipath = '/'.join((
             items['repo'], items['mozilla'],
             'raw-file', 'default',
             items['l10n.ini']
         ))
         ini = ConfigParser()
         ini.readfp(urlopen(inipath))
         allpath = urljoin(
             urljoin(inipath, ini.get('general', 'depth')),
             ini.get('general', 'all'))
         locales = urlopen(allpath).read()
     locales = locales.split()
     obs = (Active.objects
            .filter(run__tree__code=section)
            .exclude(run__locale__code__in=locales)
            .order_by('run__locale__code'))
     obslocs = ' '.join(obs.values_list('run__locale__code', flat=True))
     if not obslocs:
         self.stdout.write(' OK\n')
         return
     s = input('Remove %s? [Y/n] ' % obslocs)
     if s.lower() == 'y' or s == '':
         obs.delete()
开发者ID:Pike,项目名称:elmo,代码行数:26,代码来源:deactivate.py



注:本文中的six.moves.urllib.request.urlopen函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python request.urlretrieve函数代码示例发布时间:2022-05-27
下一篇:
Python request.pathname2url函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap