本文整理汇总了Python中requests_mock.mock函数的典型用法代码示例。如果您正苦于以下问题:Python mock函数的具体用法?Python mock怎么用?Python mock使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mock函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_blobchunkworker_run
def test_blobchunkworker_run(tmpdir):
lpath = str(tmpdir.join("test.tmp"))
with open(lpath, "wt") as f:
f.write(str(uuid.uuid4()))
args = MagicMock()
args.pageblob = True
args.autovhd = False
args.timeout = None
session = requests.Session()
adapter = requests_mock.Adapter()
session.mount("mock", adapter)
exc_list = []
flock = threading.Lock()
sa_in_queue = queue.Queue()
sa_out_queue = queue.Queue()
with requests_mock.mock() as m:
m.put("mock://blobepcontainer/blob?saskey", status_code=200)
sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, True)
with pytest.raises(IOError):
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4, flock, None)
args.pageblob = False
with requests_mock.mock() as m:
m.put("mock://blobepcontainer/blob?saskey", status_code=201)
sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, True)
try:
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4, flock, None)
except Exception:
pytest.fail("unexpected Exception raised")
m.get("mock://blobepcontainer/blob?saskey", status_code=200)
try:
bcw.getblobrange(lpath, "container", "blob", 0, 4, flock, None)
except Exception:
pytest.fail("unexpected Exception raised")
# test zero-length putblob
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 0, flock, None)
bcw._pageblob = True
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 0, flock, None)
# test empty page
with open(lpath, "wb") as f:
f.write(b"\0" * 4 * 1024 * 1024)
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4 * 1024 * 1024, flock, None)
with open(lpath, "wb") as f:
f.write(b"\0" * 4 * 1024)
bcw.putblobdata(lpath, "container", "blob", "blockid", 0, 4 * 1024, flock, None)
sa_in_queue.put((lpath, "container", "blob", "blockid", 0, 4, flock, None))
with requests_mock.mock() as m:
sbs = blobxfer.SasBlobService("mock://blobep", "saskey", None)
bcw = blobxfer.BlobChunkWorker(exc_list, sa_in_queue, sa_out_queue, args, sbs, False)
m.get("mock://blobepcontainer/blob?saskey", status_code=201)
bcw.run()
assert len(exc_list) > 0
开发者ID:weicjames,项目名称:azure-batch-samples,代码行数:60,代码来源:test_blobxfer.py
示例2: test_scihub_unresponsive
def test_scihub_unresponsive():
timeout_connect = 6
timeout_read = 6.6
timeout = (timeout_connect, timeout_read)
api = SentinelAPI("mock_user", "mock_password", timeout=timeout)
with requests_mock.mock() as rqst:
rqst.request(requests_mock.ANY, requests_mock.ANY, exc=requests.exceptions.ConnectTimeout)
with pytest.raises(requests.exceptions.ConnectTimeout):
api.query(**_small_query)
with pytest.raises(requests.exceptions.ConnectTimeout):
api.get_product_odata('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')
with pytest.raises(requests.exceptions.ConnectTimeout):
api.download('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')
with pytest.raises(requests.exceptions.ConnectTimeout):
api.download_all(['8df46c9e-a20c-43db-a19a-4240c2ed3b8b'])
with requests_mock.mock() as rqst:
rqst.request(requests_mock.ANY, requests_mock.ANY, exc=requests.exceptions.ReadTimeout)
with pytest.raises(requests.exceptions.ReadTimeout):
api.query(**_small_query)
with pytest.raises(requests.exceptions.ReadTimeout):
api.get_product_odata('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')
with pytest.raises(requests.exceptions.ReadTimeout):
api.download('8df46c9e-a20c-43db-a19a-4240c2ed3b8b')
with pytest.raises(requests.exceptions.ReadTimeout):
api.download_all(['8df46c9e-a20c-43db-a19a-4240c2ed3b8b'])
开发者ID:valgur,项目名称:sentinelsat,代码行数:34,代码来源:test_mod.py
示例3: test_gen_responses
def test_gen_responses(self):
step = 3000
start = 1426120000
end = 1426147000
groups1 = [[self.metric1, self.metric2]]
payload = self.bfc.gen_payload(start, end, 'FULL')
endpoint = self.bfc.get_multi_endpoint(self.finder.bf_query_endpoint, self.finder.tenant)
# test 401 error
with requests_mock.mock() as m:
m.post(endpoint, json={}, status_code=401)
responses = self.bfc.gen_responses(groups1, payload)
self.assertTrue(responses == [])
#test single group
_, responses = self.make_data(start, step)
with requests_mock.mock() as m:
m.post(endpoint, json={'metrics':responses}, status_code=200)
new_responses = self.bfc.gen_responses(groups1, payload)
self.assertTrue(responses == new_responses)
#test multiple groups
groups2 = [[self.metric1], [self.metric2]]
with requests_mock.mock() as m:
global json_data
json_data = [{'metrics':responses[0:1]},{'metrics':responses[1:]}]
def json_callback(request, context):
global json_data
response = json_data[0]
json_data = json_data[1:]
return response
m.post(endpoint, json=json_callback, status_code=200)
new_responses = self.bfc.gen_responses(groups2, payload)
self.assertTrue(responses == new_responses)
开发者ID:bmumdluri,项目名称:blueflood,代码行数:34,代码来源:tests.py
示例4: test_blobchunkworker_run
def test_blobchunkworker_run(tmpdir):
lpath = str(tmpdir.join('test.tmp'))
with open(lpath, 'wt') as f:
f.write(str(uuid.uuid4()))
exc_list = []
sa_in_queue = queue.Queue()
sa_out_queue = queue.Queue()
flock = threading.Lock()
sa_in_queue.put((True, lpath, 'blobep', 'saskey', 'container',
'blob', 'blockid', 0, 4, flock, None))
sa_in_queue.put((False, lpath, 'blobep', 'saskey', 'container',
'blob', 'blockid', 0, 4, flock, None))
args = MagicMock()
args.pageblob = False
args.autovhd = False
args.timeout = None
session = requests.Session()
adapter = requests_mock.Adapter()
session.mount('mock', adapter)
with requests_mock.mock() as m:
m.put('mock://blobepcontainer/blob?saskey', status_code=201)
sbs = blobxfer.SasBlobService('mock://blobep', 'saskey', None)
bcw = blobxfer.BlobChunkWorker(
exc_list, sa_in_queue, sa_out_queue, args, sbs)
try:
bcw.putblobdata(lpath, 'container', 'blob', 'blockid',
0, 4, flock, None)
except Exception:
pytest.fail('unexpected Exception raised')
m.get('mock://blobepcontainer/blob?saskey', status_code=200)
try:
bcw.getblobrange(lpath, 'container', 'blob', 0, 4, flock, None)
except Exception:
pytest.fail('unexpected Exception raised')
m.get('mock://blobepcontainer/blob?saskey', status_code=201)
bcw.run()
assert len(exc_list) > 0
exc_list = []
sa_in_queue = queue.Queue()
sa_out_queue = queue.Queue()
sa_in_queue.put((True, lpath, 'blobep', 'saskey', 'container',
'blob', 'blockid', 0, 4, flock, None))
sa_in_queue.put((False, lpath, 'blobep', 'saskey', 'container',
'blob', 'blockid', 0, 4, flock, None))
args.pageblob = True
with requests_mock.mock() as m:
m.put('mock://blobepcontainer/blob?saskey', status_code=200)
sbs = blobxfer.SasBlobService('mock://blobep', 'saskey', None)
bcw = blobxfer.BlobChunkWorker(
exc_list, sa_in_queue, sa_out_queue, args, sbs)
with pytest.raises(IOError):
bcw.putblobdata(lpath, 'container', 'blob', 'blockid',
0, 4, flock, None)
开发者ID:chanhou,项目名称:azure-batch-samples,代码行数:57,代码来源:test_blobxfer.py
示例5: test_get
def test_get(self):
doc = get_html_doc('search_engines', 'google.com')
req = download.Request(doc.url.url)
with requests_mock.mock() as m:
m.get(req.url, content=doc.body)
resp = download.get(req)
self.assertIsInstance(resp, download.HtmlDocument)
self.assertEqual(resp.body, doc.body)
with requests_mock.mock() as m:
resp = download.get(req)
self.assertIsInstance(resp, download.DownloadError)
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_download.py
示例6: test_cache
def test_cache(self):
so = get_html_doc('cmdextract', 'stackoverflow.com')
query = ['du']
options = ['--no-cache']
with requests_mock.mock() as m:
self._setup_request_mock(m, query, [so])
main(query + options)
self._check_output(contains='du')
self._truncate_stdout()
with requests_mock.mock() as m:
main(query)
self._check_output(contains='du')
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_searchcmd.py
示例7: test_verbose
def test_verbose(self):
so = get_html_doc('cmdextract', 'stackoverflow.com')
query = ['du']
options = ['-v']
with requests_mock.mock() as m:
self._setup_request_mock(m, query, [so])
main(query)
non_verbose = self._get_current_stdout()
self._truncate_stdout()
with requests_mock.mock() as m:
main(query + options)
self.assertGreater(len(self._get_current_stdout()), len(non_verbose))
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:12,代码来源:test_searchcmd.py
示例8: test_max_hits
def test_max_hits(self):
so = get_html_doc('cmdextract', 'stackoverflow.com')
query = ['du']
options = ['--max-hits', '1']
with requests_mock.mock() as m:
self._setup_request_mock(m, query, [so])
main(query)
nr_lines = len(self._get_current_stdout().split('\n'))
self._truncate_stdout()
with requests_mock.mock() as m:
main(query + options)
self.assertGreater(
nr_lines, len(self._get_current_stdout().split('\n')))
开发者ID:jimmyppi,项目名称:searchcmd,代码行数:13,代码来源:test_searchcmd.py
示例9: test_fetch_auth_key
def test_fetch_auth_key(self):
fak = PhishNetAPI(api_key='foo')
with requests_mock.mock() as m:
m._adapter.register_uri('GET', 'https://api.phish.net/api.json', [
{'text': '{"success": "0"}', 'status_code': 200},
])
self.assertRaises(AuthError, fak.fetch_auth_key, 'wilson')
with requests_mock.mock() as m:
m._adapter.register_uri('GET', 'https://api.phish.net/api.json', [
{'text': '{"success": "0"}', 'status_code': 200},
])
m._adapter.register_uri('POST', 'https://api.phish.net/api.json', [
{'text': '{"success": "1", "authkey": "232342342342"}', 'status_code': 200}
])
self.assertEqual('232342342342', fak.fetch_auth_key('wilson', 'password'))
开发者ID:jameserrico,项目名称:phishnetpy,代码行数:15,代码来源:test_phisnet_api.py
示例10: test_raises_exception_when_gateway_response_indicates_error
def test_raises_exception_when_gateway_response_indicates_error(self):
with requests_mock.mock() as m:
m.post("https://www.dummy-address.com/sendMessages", status_code=500)
client = self.create_client()
with self.assertRaises(itcsmsgwclient.GatewayError):
response = client.send([])
开发者ID:Intelecom,项目名称:smsgw-client-python,代码行数:7,代码来源:itcsmsgwclienttest.py
示例11: test_get_subcontainers_info_returns_list_of_container_info_objects
def test_get_subcontainers_info_returns_list_of_container_info_objects(self):
with requests_mock.mock() as m:
url = '{}api/v{}/subcontainers/test'.format(self.URL, self.API)
m.get(url, text='[{}, {}]', status_code=200)
subcontainers = self.c.get_subcontainers_info('test')
self.assertIsInstance(subcontainers, list)
self.assertTrue(all(isinstance(x, ContainerInfo) for x in subcontainers))
开发者ID:cgmcintyr,项目名称:pycadvisor,代码行数:7,代码来源:test_cadvisor.py
示例12: test_download_all
def test_download_all(tmpdir):
api = SentinelAPI(**_api_auth)
# From https://scihub.copernicus.eu/apihub/odata/v1/Products?$top=5&$orderby=ContentLength
filenames = ["S1A_WV_OCN__2SSH_20150603T092625_20150603T093332_006207_008194_521E",
"S1A_WV_OCN__2SSV_20150526T211029_20150526T211737_006097_007E78_134A",
"S1A_WV_OCN__2SSV_20150526T081641_20150526T082418_006090_007E3E_104C"]
api.load_query(" OR ".join(filenames))
assert len(api.get_products()) == len(filenames)
# Download normally
result = api.download_all(str(tmpdir))
assert len(result) == len(filenames)
for path, product_info in result.items():
pypath = py.path.local(path)
assert pypath.purebasename in filenames
assert pypath.check(exists=1, file=1)
assert pypath.size() == product_info["size"]
# Force one download to fail
path, product_info = list(result.items())[0]
py.path.local(path).remove()
with requests_mock.mock(real_http=True) as rqst:
url = "https://scihub.copernicus.eu/apihub/odata/v1/Products('%s')/?$format=json" % product_info["id"]
json = api.session.get(url).json()
json["d"]["Checksum"]["Value"] = "00000000000000000000000000000000"
rqst.get(url, json=json)
result = api.download_all(str(tmpdir), max_attempts=1, checksum=True)
assert len(result) == len(filenames)
assert result[path] is None
开发者ID:Fernerkundung,项目名称:sentinelsat,代码行数:30,代码来源:test_mod.py
示例13: test_server_use_server_version_flag
def test_server_use_server_version_flag(self):
with open(SERVER_INFO_25_XML, 'rb') as f:
si_response_xml = f.read().decode('utf-8')
with requests_mock.mock() as m:
m.get('http://test/api/2.4/serverInfo', text=si_response_xml)
server = TSC.Server('http://test', use_server_version=True)
self.assertEqual(server.version, '2.5')
开发者ID:davoscollective,项目名称:server-client-python,代码行数:7,代码来源:test_server_info.py
示例14: test_call_wrapper_zero_results
def test_call_wrapper_zero_results(self):
full_url = self.url % 'zero'
with requests_mock.mock() as m:
m.get(full_url, text=zero_results)
obs = _call_wrapper(full_url)
self.assertEqual(obs, {})
开发者ID:biocore,项目名称:labadmin,代码行数:7,代码来源:test_geocoder.py
示例15: query_test
def query_test(query_pattern, jdata, qlen, search_results):
with requests_mock.mock() as m:
query = FindQuery(query_pattern, 1, 2)
m.get(endpoint, json=jdata, status_code=200)
metrics = self.finder.find_nodes(query)
self.assertSequenceEqual(map(get_path, list(metrics)),
map(get_start(qlen), search_results))
开发者ID:bmumdluri,项目名称:blueflood,代码行数:7,代码来源:tests.py
示例16: test_api_authorize
def test_api_authorize(self):
aa_test = PhishNetAPI(api_key='foo')
with requests_mock.mock() as m:
m._adapter.register_uri('POST', 'https://api.phish.net/api.json', [
{'text': '{"success": "0"}', 'status_code': 200},
])
self.assertRaises(AuthError, aa_test.api_authorize, 'wilson', 'password')
开发者ID:jameserrico,项目名称:phishnetpy,代码行数:7,代码来源:test_phisnet_api.py
示例17: test_freezer_caching
def test_freezer_caching(self):
expected1 = '''\
# File managed by freeze command from buildout_helpers
# Changes will be overwritten
# ETAG: example
# ORIGIN: http://example.com/buildout.cfg
[buildout]'''
cfg = self.given_a_file_in_test_dir('buildout.cfg', '''\
[buildout]
extends= http://example.com/buildout.cfg
''')
with requests_mock.mock() as m:
m.get('http://example.com/buildout.cfg', text='''[buildout]''',
headers={'Etag': 'example'})
freeze(Config(cfg))
m.get('http://example.com/buildout.cfg', text='''''',
status_code=304)
freeze(Config(cfg))
last_call = m.request_history[-1]
self.assertEqual('example',
last_call._request.headers['If-None-Match'])
abs_dir, _ = os.path.split(cfg)
new_file_contents = open(os.path.join(abs_dir,
'external_buildouts',
'example.com_buildout.cfg'),
'r').read()
self.assertEqual(new_file_contents, expected1)
开发者ID:collective,项目名称:buildout_helpers,代码行数:27,代码来源:test_freeze.py
示例18: test_import_netcdf_dataset_incomplete
def test_import_netcdf_dataset_incomplete(import_job_data, tmp_file_data, dataset_import_data):
import_job_data = copy.copy(import_job_data)
import_job_data['message'] = json.dumps({'next_uri': '/datasets/import/a1b2c3/overview/'})
with requests_mock.mock() as m:
m.post('https://databasin.org/uploads/upload-temporary-file/', text=json.dumps({'uuid': 'abcd'}))
m.get('https://databasin.org/api/v1/uploads/temporary-files/abcd/', text=json.dumps(tmp_file_data))
m.post('https://databasin.org/api/v1/jobs/', headers={'Location': 'https://databasin.org/api/v1/jobs/1234/'})
m.get('https://databasin.org/api/v1/jobs/1234/', text=json.dumps(import_job_data))
m.get('https://databasin.org/api/v1/dataset_imports/a1b2c3/', text=json.dumps(dataset_import_data))
m.delete('https://databasin.org/api/v1/dataset_imports/a1b2c3/')
f = six.BytesIO()
with zipfile.ZipFile(f, 'w') as zf:
zf.writestr('test.nc', '')
zf.writestr('style.json', '')
f.seek(0)
with mock.patch.object(builtins, 'open', mock.Mock(return_value=f)) as open_mock:
c = Client()
c._session.cookies['csrftoken'] = 'abcd'
with pytest.raises(DatasetImportError):
c.import_netcdf_dataset('test.zip')
assert m.call_count == 6
开发者ID:consbio,项目名称:python-databasin,代码行数:26,代码来源:test_client.py
示例19: test_get_all_docker_containers_info_returns_list_of_container_info_objects
def test_get_all_docker_containers_info_returns_list_of_container_info_objects(self):
with requests_mock.mock() as m:
url = '{}api/v{}/docker/'.format(self.URL, self.API)
m.get(url, text='[{}, {}]', status_code=200)
docker = self.c.get_all_docker_containers_info()
self.assertIsInstance(docker, list)
self.assertTrue(all(isinstance(x, ContainerInfo) for x in docker))
开发者ID:cgmcintyr,项目名称:pycadvisor,代码行数:7,代码来源:test_cadvisor.py
示例20: test_import_netcdf_dataset_with_zip
def test_import_netcdf_dataset_with_zip(import_job_data, dataset_data, tmp_file_data):
with requests_mock.mock() as m:
m.post('https://databasin.org/uploads/upload-temporary-file/', text=json.dumps({'uuid': 'abcd'}))
m.get('https://databasin.org/api/v1/uploads/temporary-files/abcd/', text=json.dumps(tmp_file_data))
m.post('https://databasin.org/api/v1/jobs/', headers={'Location': 'https://databasin.org/api/v1/jobs/1234/'})
m.get('https://databasin.org/api/v1/jobs/1234/', text=json.dumps(import_job_data))
m.get('https://databasin.org/api/v1/datasets/a1b2c3/', text=json.dumps(dataset_data))
f = six.BytesIO()
with zipfile.ZipFile(f, 'w') as zf:
zf.writestr('test.nc', '')
zf.writestr('style.json', '')
f.seek(0)
with mock.patch.object(builtins, 'open', mock.Mock(return_value=f)) as open_mock:
c = Client()
c._session.cookies['csrftoken'] = 'abcd'
dataset = c.import_netcdf_dataset('test.zip')
open_mock.assert_called_once_with('test.zip', 'a+b')
assert m.call_count == 5
assert dataset.id == 'a1b2c3'
request_data = json.loads(m.request_history[2].text)
assert request_data['job_name'] == 'create_import_job'
assert request_data['job_args']['file'] == 'abcd'
assert request_data['job_args']['dataset_type'] == 'NetCDF_Native'
开发者ID:consbio,项目名称:python-databasin,代码行数:26,代码来源:test_client.py
注:本文中的requests_mock.mock函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论