• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python requests_mock.mock函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests_mock.mock函数的典型用法代码示例。如果您正苦于以下问题:Python mock函数的具体用法?Python mock怎么用?Python mock使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了mock函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_blobchunkworker_run

def test_blobchunkworker_run(tmpdir):
    lpath = str(tmpdir.join("test.tmp"))
    with open(lpath, "wt") as f:
        f.write(str(uuid.uuid4()))
    args = MagicMock()
    args.pageblob = True
    args.autovhd = False
    args.timeout = None

    session = requests.Session()
    adapter = requests_mock.Adapter()
    session.mount("mock", adapter)

    exc_list = []
    flock = threading.Lock()
    sa_in_queue = queue.Queue()
    sa_out_queue = queue.Queue()
    with requests_mock.mock() as m:
        m.put("mock://blobepcontainer/blob?saskey", status_code=200)
        sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
        bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, True)
        with pytest.raises(IOError):
            bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4, flock, None)

    args.pageblob = False
    with requests_mock.mock() as m:
        m.put("mock://blobepcontainer/blob?saskey", status_code=201)
        sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
        bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, True)
        try:
            bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4, flock, None)
        except Exception:
            pytest.fail("unexpected Exception raised")

        m.get("mock://blobepcontainer/blob?saskey", status_code=200)
        try:
            bcw.getblobrange(lpath, "container", "blob", 0, 4, flock, None)
        except Exception:
            pytest.fail("unexpected Exception raised")

        # test zero-length putblob
        bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 0, flock, None)
        bcw._pageblob = True
        bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 0, flock, None)

        # test empty page
        with open(lpath, "wb") as f:
            f.write(b"\0" * 4 * 1024 * 1024)
        bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4 * 1024 * 1024, flock, None)
        with open(lpath, "wb") as f:
            f.write(b"\0" * 4 * 1024)
        bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4 * 1024, flock, None)

    sa_in_queue.put((lpath, "container", "blob", "blockid", 0, 4, flock, None))
    with requests_mock.mock() as m:
        sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
        bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, False)
        m.get("mock://blobepcontainer/blob?saskey", status_code=201)
        bcw.run()
        assert len(exc_list) > 0
开发者ID:weicjames,项目名称:azure-batch-samples,代码行数:60,代码来源:test_blobxfer.py


示例2: test_scihub_unresponsive

def test_scihub_unresponsive():
    timeout_connect = 6
    timeout_read = 6.6
    timeout = (timeout_connect, timeout_read)

    api = SentinelAPI("mock_user", "mock_password", timeout=timeout)

    with requests_mock.mock() as rqst:
        rqst.request(requests_mock.ANY, requests_mock.ANY, exc=requests.exceptions.ConnectTimeout)
        with pytest.raises(requests.exceptions.ConnectTimeout):
            api.query(**_small_query)

        with pytest.raises(requests.exceptions.ConnectTimeout):
            api.get_product_odata('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')

        with pytest.raises(requests.exceptions.ConnectTimeout):
            api.download('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')

        with pytest.raises(requests.exceptions.ConnectTimeout):
            api.download_all(['8df46c9e-a20c-43db-a19a-4240c2ed3b8b'])

    with requests_mock.mock() as rqst:
        rqst.request(requests_mock.ANY, requests_mock.ANY, exc=requests.exceptions.ReadTimeout)
        with pytest.raises(requests.exceptions.ReadTimeout):
            api.query(**_small_query)

        with pytest.raises(requests.exceptions.ReadTimeout):
            api.get_product_odata('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')

        with pytest.raises(requests.exceptions.ReadTimeout):
            api.download('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')

        with pytest.raises(requests.exceptions.ReadTimeout):
            api.download_all(['8df46c9e-a20c-43db-a19a-4240c2ed3b8b'])
开发者ID:valgur,项目名称:sentinelsat,代码行数:34,代码来源:test_mod.py


示例3: test_gen_responses

  def test_gen_responses(self):
    step = 3000
    start = 1426120000
    end = 1426147000
    groups1 = [[self.metric1, self.metric2]]
    payload = self.bfc.gen_payload(start, end, 'FULL')
    endpoint = self.bfc.get_multi_endpoint(self.finder.bf_query_endpoint, self.finder.tenant)
    # test 401 error
    with requests_mock.mock() as m:
      m.post(endpoint, json={}, status_code=401)
      responses = self.bfc.gen_responses(groups1, payload)
      self.assertTrue(responses == [])
    
    #test single group
    _, responses = self.make_data(start, step)
    with requests_mock.mock() as m:
      m.post(endpoint, json={'metrics':responses}, status_code=200)
      new_responses = self.bfc.gen_responses(groups1, payload)
      self.assertTrue(responses == new_responses)

    #test multiple groups
    groups2 = [[self.metric1], [self.metric2]]
    with requests_mock.mock() as m:
      global json_data
      json_data = [{'metrics':responses[0:1]},{'metrics':responses[1:]}]
      def json_callback(request, context):
        global json_data
        response = json_data[0]
        json_data = json_data[1:]
        return response

      m.post(endpoint, json=json_callback, status_code=200)
      new_responses = self.bfc.gen_responses(groups2, payload)
      self.assertTrue(responses == new_responses)
开发者ID:bmumdluri,项目名称:blueflood,代码行数:34,代码来源:tests.py


示例4: test_blobchunkworker_run

def test_blobchunkworker_run(tmpdir):
    lpath = str(tmpdir.join('test.tmp'))
    with open(lpath, 'wt') as f:
        f.write(str(uuid.uuid4()))
    exc_list = []
    sa_in_queue = queue.Queue()
    sa_out_queue = queue.Queue()
    flock = threading.Lock()
    sa_in_queue.put((True, lpath, 'blobep', 'saskey', 'container',
                     'blob', 'blockid', 0, 4, flock, None))
    sa_in_queue.put((False, lpath, 'blobep', 'saskey', 'container',
                     'blob', 'blockid', 0, 4, flock, None))
    args = MagicMock()
    args.pageblob = False
    args.autovhd = False
    args.timeout = None

    session = requests.Session()
    adapter = requests_mock.Adapter()
    session.mount('mock', adapter)
    with requests_mock.mock() as m:
        m.put('mock://blobepcontainer/blob?saskey', status_code=201)
        sbs = blobxfer.SasBlobService('mock://blobep', 'saskey', None)
        bcw = blobxfer.BlobChunkWorker(
            exc_list, sa_in_queue, sa_out_queue, args, sbs)
        try:
            bcw.putblobdata(lpath, 'container', 'blob', 'blockid',
                            0, 4, flock, None)
        except Exception:
            pytest.fail('unexpected Exception raised')

        m.get('mock://blobepcontainer/blob?saskey', status_code=200)
        try:
            bcw.getblobrange(lpath, 'container', 'blob', 0, 4, flock, None)
        except Exception:
            pytest.fail('unexpected Exception raised')

        m.get('mock://blobepcontainer/blob?saskey', status_code=201)
        bcw.run()
        assert len(exc_list) > 0

    exc_list = []
    sa_in_queue = queue.Queue()
    sa_out_queue = queue.Queue()
    sa_in_queue.put((True, lpath, 'blobep', 'saskey', 'container',
                     'blob', 'blockid', 0, 4, flock, None))
    sa_in_queue.put((False, lpath, 'blobep', 'saskey', 'container',
                     'blob', 'blockid', 0, 4, flock, None))
    args.pageblob = True
    with requests_mock.mock() as m:
        m.put('mock://blobepcontainer/blob?saskey', status_code=200)
        sbs = blobxfer.SasBlobService('mock://blobep', 'saskey', None)
        bcw = blobxfer.BlobChunkWorker(
            exc_list, sa_in_queue, sa_out_queue, args, sbs)
        with pytest.raises(IOError):
            bcw.putblobdata(lpath, 'container', 'blob', 'blockid',
                            0, 4, flock, None)
开发者ID:chanhou,项目名称:azure-batch-samples,代码行数:57,代码来源:test_blobxfer.py


示例5: test_get

    def test_get(self):
        doc = get_html_doc('search_engines', 'google.com')
        req = download.Request(doc.url.url)
        with requests_mock.mock() as m:
            m.get(req.url, content=doc.body)
            resp = download.get(req)
            self.assertIsInstance(resp, download.HtmlDocument)
            self.assertEqual(resp.body, doc.body)

        with requests_mock.mock() as m:
            resp = download.get(req)
            self.assertIsInstance(resp, download.DownloadError)
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_download.py


示例6: test_cache

 def test_cache(self):
     so = get_html_doc('cmdextract', 'stackoverflow.com')
     query = ['du']
     options = ['--no-cache']
     with requests_mock.mock() as m:
         self._setup_request_mock(m, query, [so])
         main(query + options)
     self._check_output(contains='du')
     self._truncate_stdout()
     with requests_mock.mock() as m:
         main(query)
     self._check_output(contains='du')
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_searchcmd.py


示例7: test_verbose

 def test_verbose(self):
     so = get_html_doc('cmdextract', 'stackoverflow.com')
     query = ['du']
     options = ['-v']
     with requests_mock.mock() as m:
         self._setup_request_mock(m, query, [so])
         main(query)
     non_verbose = self._get_current_stdout()
     self._truncate_stdout()
     with requests_mock.mock() as m:
         main(query + options)
     self.assertGreater(len(self._get_current_stdout()), len(non_verbose))
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_searchcmd.py


示例8: test_max_hits

 def test_max_hits(self):
     so = get_html_doc('cmdextract', 'stackoverflow.com')
     query = ['du']
     options = ['--max-hits', '1']
     with requests_mock.mock() as m:
         self._setup_request_mock(m, query, [so])
         main(query)
     nr_lines = len(self._get_current_stdout().split('\n'))
     self._truncate_stdout()
     with requests_mock.mock() as m:
         main(query + options)
     self.assertGreater(
         nr_lines, len(self._get_current_stdout().split('\n')))
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:13,代码来源:test_searchcmd.py


示例9: test_fetch_auth_key

 def test_fetch_auth_key(self):
     fak = PhishNetAPI(api_key='foo')
     with requests_mock.mock() as m:
         m._adapter.register_uri('GET', 'https://api.phish.net/api.json', [
             {'text': '{"success": "0"}', 'status_code': 200},
         ])
         self.assertRaises(AuthError, fak.fetch_auth_key, 'wilson')
     with requests_mock.mock() as m:
         m._adapter.register_uri('GET', 'https://api.phish.net/api.json', [
             {'text': '{"success": "0"}', 'status_code': 200},
         ])
         m._adapter.register_uri('POST', 'https://api.phish.net/api.json', [
             {'text': '{"success": "1", "authkey": "232342342342"}', 'status_code': 200}
         ])
         self.assertEqual('232342342342', fak.fetch_auth_key('wilson', 'password'))
开发者ID:jameserrico,项目名称:phishnetpy,代码行数:15,代码来源:test_phisnet_api.py


示例10: test_raises_exception_when_gateway_response_indicates_error

    def test_raises_exception_when_gateway_response_indicates_error(self):
        with requests_mock.mock() as m:
            m.post("https://www.dummy-address.com/sendMessages", status_code=500)

            client = self.create_client()
            with self.assertRaises(itcsmsgwclient.GatewayError):
                response = client.send([])
开发者ID:Intelecom,项目名称:smsgw-client-python,代码行数:7,代码来源:itcsmsgwclienttest.py


示例11: test_get_subcontainers_info_returns_list_of_container_info_objects

 def test_get_subcontainers_info_returns_list_of_container_info_objects(self):
     with requests_mock.mock() as m:
         url = '{}api/v{}/subcontainers/test'.format(self.URL, self.API)
         m.get(url, text='[{}, {}]', status_code=200)
         subcontainers = self.c.get_subcontainers_info('test')
         self.assertIsInstance(subcontainers, list)
         self.assertTrue(all(isinstance(x, ContainerInfo) for x in subcontainers))
开发者ID:cgmcintyr,项目名称:pycadvisor,代码行数:7,代码来源:test_cadvisor.py


示例12: test_download_all

def test_download_all(tmpdir):
    api = SentinelAPI(**_api_auth)
    # From https://scihub.copernicus.eu/apihub/odata/v1/Products?$top=5&$orderby=ContentLength
    filenames = ["S1A_WV_OCN__2SSH_20150603T092625_20150603T093332_006207_008194_521E",
                 "S1A_WV_OCN__2SSV_20150526T211029_20150526T211737_006097_007E78_134A",
                 "S1A_WV_OCN__2SSV_20150526T081641_20150526T082418_006090_007E3E_104C"]

    api.load_query(" OR ".join(filenames))
    assert len(api.get_products()) == len(filenames)

    # Download normally
    result = api.download_all(str(tmpdir))
    assert len(result) == len(filenames)
    for path, product_info in result.items():
        pypath = py.path.local(path)
        assert pypath.purebasename in filenames
        assert pypath.check(exists=1, file=1)
        assert pypath.size() == product_info["size"]

    # Force one download to fail
    path, product_info = list(result.items())[0]
    py.path.local(path).remove()
    with requests_mock.mock(real_http=True) as rqst:
        url = "https://scihub.copernicus.eu/apihub/odata/v1/Products('%s')/?$format=json" % product_info["id"]
        json = api.session.get(url).json()
        json["d"]["Checksum"]["Value"] = "00000000000000000000000000000000"
        rqst.get(url, json=json)
        result = api.download_all(str(tmpdir), max_attempts=1, checksum=True)
        assert len(result) == len(filenames)
        assert result[path] is None
开发者ID:Fernerkundung,项目名称:sentinelsat,代码行数:30,代码来源:test_mod.py


示例13: test_server_use_server_version_flag

 def test_server_use_server_version_flag(self):
     with open(SERVER_INFO_25_XML, 'rb') as f:
         si_response_xml = f.read().decode('utf-8')
     with requests_mock.mock() as m:
         m.get('http://test/api/2.4/serverInfo', text=si_response_xml)
         server = TSC.Server('http://test', use_server_version=True)
         self.assertEqual(server.version, '2.5')
开发者ID:davoscollective,项目名称:server-client-python,代码行数:7,代码来源:test_server_info.py


示例14: test_call_wrapper_zero_results

    def test_call_wrapper_zero_results(self):
        full_url = self.url % 'zero'
        with requests_mock.mock() as m:
            m.get(full_url, text=zero_results)

            obs = _call_wrapper(full_url)
            self.assertEqual(obs, {})
开发者ID:biocore,项目名称:labadmin,代码行数:7,代码来源:test_geocoder.py


示例15: query_test

 def query_test(query_pattern, jdata, qlen, search_results):
   with requests_mock.mock() as m:
     query = FindQuery(query_pattern, 1, 2)
     m.get(endpoint, json=jdata, status_code=200)
     metrics = self.finder.find_nodes(query)
     self.assertSequenceEqual(map(get_path, list(metrics)),
                              map(get_start(qlen), search_results))
开发者ID:bmumdluri,项目名称:blueflood,代码行数:7,代码来源:tests.py


示例16: test_api_authorize

 def test_api_authorize(self):
     aa_test = PhishNetAPI(api_key='foo')
     with requests_mock.mock() as m:
         m._adapter.register_uri('POST', 'https://api.phish.net/api.json', [
             {'text': '{"success": "0"}', 'status_code': 200},
         ])
         self.assertRaises(AuthError, aa_test.api_authorize, 'wilson', 'password')
开发者ID:jameserrico,项目名称:phishnetpy,代码行数:7,代码来源:test_phisnet_api.py


示例17: test_freezer_caching

    def test_freezer_caching(self):
        expected1 = '''\
# File managed by freeze command from buildout_helpers
# Changes will be overwritten
# ETAG: example
# ORIGIN: http://example.com/buildout.cfg
[buildout]'''
        cfg = self.given_a_file_in_test_dir('buildout.cfg', '''\
[buildout]
extends= http://example.com/buildout.cfg
''')
        with requests_mock.mock() as m:
            m.get('http://example.com/buildout.cfg', text='''[buildout]''',
                  headers={'Etag': 'example'})
            freeze(Config(cfg))
            m.get('http://example.com/buildout.cfg', text='''''',
                  status_code=304)
            freeze(Config(cfg))
            last_call = m.request_history[-1]
            self.assertEqual('example',
                             last_call._request.headers['If-None-Match'])
        abs_dir, _ = os.path.split(cfg)
        new_file_contents = open(os.path.join(abs_dir,
                                              'external_buildouts',
                                              'example.com_buildout.cfg'),
                                 'r').read()
        self.assertEqual(new_file_contents, expected1)
开发者ID:collective,项目名称:buildout_helpers,代码行数:27,代码来源:test_freeze.py


示例18: test_import_netcdf_dataset_incomplete

def test_import_netcdf_dataset_incomplete(import_job_data, tmp_file_data, dataset_import_data):
    import_job_data = copy.copy(import_job_data)
    import_job_data['message'] = json.dumps({'next_uri': '/datasets/import/a1b2c3/overview/'})

    with requests_mock.mock() as m:
        m.post('https://databasin.org/uploads/upload-temporary-file/', text=json.dumps({'uuid': 'abcd'}))
        m.get('https://databasin.org/api/v1/uploads/temporary-files/abcd/', text=json.dumps(tmp_file_data))
        m.post('https://databasin.org/api/v1/jobs/', headers={'Location': 'https://databasin.org/api/v1/jobs/1234/'})
        m.get('https://databasin.org/api/v1/jobs/1234/', text=json.dumps(import_job_data))
        m.get('https://databasin.org/api/v1/dataset_imports/a1b2c3/', text=json.dumps(dataset_import_data))
        m.delete('https://databasin.org/api/v1/dataset_imports/a1b2c3/')

        f = six.BytesIO()
        with zipfile.ZipFile(f, 'w') as zf:
            zf.writestr('test.nc', '')
            zf.writestr('style.json', '')
        f.seek(0)

        with mock.patch.object(builtins, 'open', mock.Mock(return_value=f)) as open_mock:
            c = Client()
            c._session.cookies['csrftoken'] = 'abcd'

            with pytest.raises(DatasetImportError):
                c.import_netcdf_dataset('test.zip')

            assert m.call_count == 6
开发者ID:consbio,项目名称:python-databasin,代码行数:26,代码来源:test_client.py


示例19: test_get_all_docker_containers_info_returns_list_of_container_info_objects

 def test_get_all_docker_containers_info_returns_list_of_container_info_objects(self):
     with requests_mock.mock() as m:
         url = '{}api/v{}/docker/'.format(self.URL, self.API)
         m.get(url, text='[{}, {}]', status_code=200)
         docker = self.c.get_all_docker_containers_info()
         self.assertIsInstance(docker, list)
         self.assertTrue(all(isinstance(x, ContainerInfo) for x in docker))
开发者ID:cgmcintyr,项目名称:pycadvisor,代码行数:7,代码来源:test_cadvisor.py


示例20: test_import_netcdf_dataset_with_zip

def test_import_netcdf_dataset_with_zip(import_job_data, dataset_data, tmp_file_data):
    with requests_mock.mock() as m:
        m.post('https://databasin.org/uploads/upload-temporary-file/', text=json.dumps({'uuid': 'abcd'}))
        m.get('https://databasin.org/api/v1/uploads/temporary-files/abcd/', text=json.dumps(tmp_file_data))
        m.post('https://databasin.org/api/v1/jobs/', headers={'Location': 'https://databasin.org/api/v1/jobs/1234/'})
        m.get('https://databasin.org/api/v1/jobs/1234/', text=json.dumps(import_job_data))
        m.get('https://databasin.org/api/v1/datasets/a1b2c3/', text=json.dumps(dataset_data))

        f = six.BytesIO()
        with zipfile.ZipFile(f, 'w') as zf:
            zf.writestr('test.nc', '')
            zf.writestr('style.json', '')
        f.seek(0)

        with mock.patch.object(builtins, 'open', mock.Mock(return_value=f)) as open_mock:
            c = Client()
            c._session.cookies['csrftoken'] = 'abcd'
            dataset = c.import_netcdf_dataset('test.zip')

            open_mock.assert_called_once_with('test.zip', 'a+b')
            assert m.call_count == 5
            assert dataset.id == 'a1b2c3'
            request_data = json.loads(m.request_history[2].text)
            assert request_data['job_name'] == 'create_import_job'
            assert request_data['job_args']['file'] == 'abcd'
            assert request_data['job_args']['dataset_type'] == 'NetCDF_Native'
开发者ID:consbio,项目名称:python-databasin,代码行数:26,代码来源:test_client.py



注:本文中的requests_mock.mock函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python requests_respectful.RespectfulRequester类代码示例发布时间:2022-05-26
下一篇:
Python sessions.FuturesSession类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap