本文整理汇总了Python中webserver.install_http_handler函数的典型用法代码示例。如果您正苦于以下问题:Python install_http_handler函数的具体用法?Python install_http_handler怎么用?Python install_http_handler使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了install_http_handler函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_visoss_8
def test_visoss_8():
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/visoss_8/?delimiter=%2F', 200,
{'Content-type': 'application/xml'},
"""<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult>
<Prefix></Prefix>
<Contents>
<Key>test</Key>
<LastModified>1970-01-01T00:00:01.000Z</LastModified>
<Size>40</Size>
</Contents>
<CommonPrefixes>
<Prefix>test/</Prefix>
</CommonPrefixes>
</ListBucketResult>
""")
with webserver.install_http_handler(handler):
listdir = gdal.ReadDir('/vsioss/visoss_8', 0)
assert listdir == ['test', 'test/']
handler = webserver.SequentialHandler()
with webserver.install_http_handler(handler):
assert not stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test').mode)
handler = webserver.SequentialHandler()
with webserver.install_http_handler(handler):
assert stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test/').mode)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:33,代码来源:vsioss.py
示例2: test_vsiwebhdfs_stat
def test_vsiwebhdfs_stat():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/bar?op=GETFILESTATUS', 200,
{}, '{"FileStatus":{"type":"FILE","length":1000000}}')
with webserver.install_http_handler(handler):
stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar')
if stat_res is None or stat_res.size != 1000000:
if stat_res is not None:
print(stat_res.size)
else:
print(stat_res)
pytest.fail()
# Test caching
stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar')
assert stat_res.size == 1000000
# Test missing file
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/unexisting?op=GETFILESTATUS', 404, {},
'{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File does not exist: /unexisting"}}')
with webserver.install_http_handler(handler):
stat_res = gdal.VSIStatL(
gdaltest.webhdfs_base_connection + '/unexisting')
assert stat_res is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:31,代码来源:vsiwebhdfs.py
示例3: test_ogr_wfs3_get_feature_count
def test_ogr_wfs3_get_feature_count():
if gdaltest.wfs3_drv is None:
pytest.skip()
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/wfs3/collections', 200, {'Content-Type': 'application/json'},
"""{ "collections" : [ {
"name": "foo",
"extent": {
"spatial": [ -10, 40, 15, 50 ]
}
}] }""")
with webserver.install_http_handler(handler):
ds = ogr.Open('WFS3:http://localhost:%d/wfs3' % gdaltest.webserver_port)
lyr = ds.GetLayer(0)
handler = webserver.SequentialHandler()
# Fake openapi response
handler.add('GET', '/wfs3/api', 200, {'Content-Type': 'application/json'},
"""{
"openapi": "3.0.0",
"paths" : {
"/collections/foo/items": {
"get": {
"parameters": [
{
"$ref" : "#/components/parameters/resultType"
}
]
}
}
},
"components": {
"parameters": {
"resultType": {
"name": "resultType",
"in": "query",
"schema" : {
"type" : "string",
"enum" : [ "hits", "results" ]
}
}
}
}
}""")
handler.add('GET', '/wfs3/collections/foo/items?resultType=hits', 200,
{'Content-Type': 'application/json'},
'{ "numberMatched": 1234 }')
with webserver.install_http_handler(handler):
assert lyr.GetFeatureCount() == 1234
handler = webserver.SequentialHandler()
handler.add('GET', '/wfs3/collections/foo/items?resultType=hits', 200,
{'Content-Type': 'application/json'},
'{ "numberMatched": 1234 }')
with webserver.install_http_handler(handler):
assert lyr.GetFeatureCount() == 1234
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:60,代码来源:ogr_wfs3.py
示例4: vsiaz_fake_unlink
def vsiaz_fake_unlink():
if gdaltest.webserver_port == 0:
return 'skip'
# Success
handler = webserver.SequentialHandler()
handler.add('HEAD', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 200, {'Content-Length': '1'})
handler.add('DELETE', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 202, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Unlink('/vsiaz/az_bucket_test_unlink/myfile')
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
# Failure
handler = webserver.SequentialHandler()
handler.add('HEAD', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 200, {'Content-Length': '1'})
handler.add('DELETE', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 400, {'Connection': 'close'})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ret = gdal.Unlink('/vsiaz/az_bucket_test_unlink/myfile')
if ret != -1:
gdaltest.post_reason('fail')
return 'fail'
return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:27,代码来源:vsiaz.py
示例5: test_vsicurl_test_retry
def test_vsicurl_test_retry():
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/test_retry/', 404)
handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
handler.add('GET', '/test_retry/test.txt', 502)
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL('/vsicurl/http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
data_len = 0
if f:
data_len = len(gdal.VSIFReadL(1, 1, f))
gdal.VSIFCloseL(f)
assert data_len == 0
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/test_retry/', 404)
handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
handler.add('GET', '/test_retry/test.txt', 502)
handler.add('GET', '/test_retry/test.txt', 429)
handler.add('GET', '/test_retry/test.txt', 200, {}, 'foo')
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL('/vsicurl?max_retry=2&retry_delay=0.01&url=http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
assert f is not None
gdal.ErrorReset()
with gdaltest.error_handler():
data = gdal.VSIFReadL(1, 3, f).decode('ascii')
error_msg = gdal.GetLastErrorMsg()
gdal.VSIFCloseL(f)
assert data == 'foo'
assert '429' in error_msg
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:35,代码来源:vsicurl.py
示例6: test_vsiswift_fake_unlink
def test_vsiswift_fake_unlink():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
# Success
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
{'Content-Range': 'bytes 0-0/1'}, 'x')
handler.add('DELETE', '/v1/AUTH_something/foo/bar', 202, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Unlink('/vsiswift/foo/bar')
assert ret == 0
# Failure
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
{'Content-Range': 'bytes 0-0/1'}, 'x')
handler.add('DELETE', '/v1/AUTH_something/foo/bar', 400, {'Connection': 'close'})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ret = gdal.Unlink('/vsiswift/foo/bar')
assert ret == -1
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:25,代码来源:vsiswift.py
示例7: vsiwebhdfs_unlink
def vsiwebhdfs_unlink():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
# Success
handler = webserver.SequentialHandler()
handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 200,
{}, '{"boolean":true}')
with webserver.install_http_handler(handler):
ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.VSICurlClearCache()
# With permissions
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE&user.name=root&delegation=token', 200,
{}, '{"boolean":true}')
with gdaltest.config_options({'WEBHDFS_USERNAME': 'root',
'WEBHDFS_DELEGATION': 'token'}):
with webserver.install_http_handler(handler):
ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
# Failure
handler = webserver.SequentialHandler()
handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 200,
{}, '{"boolean":false}')
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
if ret != -1:
gdaltest.post_reason('fail')
return 'fail'
gdal.VSICurlClearCache()
# Failure
handler = webserver.SequentialHandler()
handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 404,
{})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
if ret != -1:
gdaltest.post_reason('fail')
return 'fail'
return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:60,代码来源:vsiwebhdfs.py
示例8: vsiswift_fake_unlink
def vsiswift_fake_unlink():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
# Success
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
{'Content-Range': 'bytes 0-0/1'}, 'x')
handler.add('DELETE', '/v1/AUTH_something/foo/bar', 202, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Unlink('/vsiswift/foo/bar')
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
# Failure
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
{'Content-Range': 'bytes 0-0/1'}, 'x')
handler.add('DELETE', '/v1/AUTH_something/foo/bar', 400, {'Connection': 'close'})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ret = gdal.Unlink('/vsiswift/foo/bar')
if ret != -1:
gdaltest.post_reason('fail')
return 'fail'
return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:31,代码来源:vsiswift.py
示例9: test_vsicurl_test_parse_html_filelist_apache
def test_vsicurl_test_parse_html_filelist_apache():
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/mydir/', 200, {}, """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<head>
<title>Index of /mydir</title>
</head>
<body>
<h1>Index of /mydir</h1>
<table><tr><th><img src="/icons/blank.gif" alt="[ICO]"></th><th><a href="?C=N;O=D">Name</a></th><th><a href="?C=M;O=A">Last modified</a></th><th><a href="?C=S;O=A">Size</a></th><th><a href="?C=D;O=A">Description</a></th></tr><tr><th colspan="5"><hr></th></tr>
<tr><td valign="top"><img src="/icons/back.gif" alt="[DIR]"></td><td><a href="/gdal/data/">Parent Directory</a></td><td> </td><td align="right"> - </td><td> </td></tr>
<tr><td valign="top"><img src="/icons/image2.gif" alt="[IMG]"></td><td><a href="foo.tif">foo.tif</a></td><td align="right">17-May-2010 12:26 </td><td align="right"> 90K</td><td> </td></tr>
<tr><td valign="top"><img src="/icons/image2.gif" alt="[IMG]"></td><td><a href="foo%20with%20space.tif">foo with space.tif</a></td><td align="right">15-Jan-2007 11:02 </td><td align="right">736 </td><td> </td></tr>
<tr><th colspan="5"><hr></th></tr>
</table>
</body></html>""")
with webserver.install_http_handler(handler):
fl = gdal.ReadDir('/vsicurl/http://localhost:%d/mydir' % gdaltest.webserver_port)
assert fl == ['foo.tif', 'foo%20with%20space.tif']
assert gdal.VSIStatL('/vsicurl/http://localhost:%d/mydir/foo%%20with%%20space.tif' % gdaltest.webserver_port, gdal.VSI_STAT_EXISTS_FLAG) is not None
handler = webserver.SequentialHandler()
handler.add('HEAD', '/mydir/i_dont_exist', 404, {})
with webserver.install_http_handler(handler):
assert gdal.VSIStatL('/vsicurl/http://localhost:%d/mydir/i_dont_exist' % gdaltest.webserver_port, gdal.VSI_STAT_EXISTS_FLAG) is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:30,代码来源:vsicurl.py
示例10: ogr_wfs3_empty_layer
def ogr_wfs3_empty_layer():
if gdaltest.wfs3_drv is None:
return 'skip'
if gdaltest.webserver_port == 0:
return 'skip'
handler = webserver.SequentialHandler()
handler.add('GET', '/wfs3/collections', 200,
{'Content-Type': 'application/json'},
'{ "collections" : [ { "name": "foo" }] }')
with webserver.install_http_handler(handler):
ds = ogr.Open('WFS3:http://localhost:%d/wfs3' % gdaltest.webserver_port)
if ds is None:
gdaltest.post_reason('fail')
return 'fail'
if ds.GetLayerCount() != 1:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayer(0)
if lyr.GetName() != 'foo':
gdaltest.post_reason('fail')
return 'fail'
handler = webserver.SequentialHandler()
handler.add('GET', '/wfs3/collections/foo/items?limit=10', 200,
{'Content-Type': 'application/geo+json'},
'{ "type": "FeatureCollection", "features": [] }')
with webserver.install_http_handler(handler):
if lyr.GetLayerDefn().GetFieldCount() != 0:
gdaltest.post_reason('fail')
return 'fail'
return 'success'
开发者ID:normanb,项目名称:gdal,代码行数:34,代码来源:ogr_wfs3.py
示例11: test_vsiwebhdfs_readdir
def test_vsiwebhdfs_readdir():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
{}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
assert dir_contents == ['bar.baz', 'mysubdir']
stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
assert stat_res.size == 123456
assert stat_res.mtime == 1
# ReadDir on something known to be a file shouldn't cause network access
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + '/foo/bar.baz')
assert dir_contents is None
# Test error on ReadDir()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + 'foo/error_test/')
assert dir_contents is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:29,代码来源:vsiwebhdfs.py
示例12: vsiswift_fake_auth_storage_url_and_auth_token
def vsiswift_fake_auth_storage_url_and_auth_token():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
gdal.SetConfigOption('SWIFT_AUTH_V1_URL', '')
gdal.SetConfigOption('SWIFT_USER', '')
gdal.SetConfigOption('SWIFT_KEY', '')
gdal.SetConfigOption('SWIFT_STORAGE_URL', 'http://127.0.0.1:%d/v1/AUTH_something' % gdaltest.webserver_port)
gdal.SetConfigOption('SWIFT_AUTH_TOKEN', 'my_auth_token')
# Failure
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 501)
with webserver.install_http_handler(handler):
f = open_for_read('/vsiswift/foo/bar')
if f is None:
gdaltest.post_reason('fail')
return 'fail'
gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
gdal.VSICurlClearCache()
# Success
def method(request):
request.protocol_version = 'HTTP/1.1'
h = request.headers
if 'x-auth-token' not in h or \
h['x-auth-token'] != 'my_auth_token':
sys.stderr.write('Bad headers: %s\n' % str(h))
request.send_response(403)
return
request.send_response(200)
request.send_header('Content-type', 'text/plain')
request.send_header('Content-Length', 3)
request.send_header('Connection', 'close')
request.end_headers()
request.wfile.write("""foo""".encode('ascii'))
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', custom_method=method)
with webserver.install_http_handler(handler):
f = open_for_read('/vsiswift/foo/bar')
if f is None:
gdaltest.post_reason('fail')
return 'fail'
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
if data != 'foo':
gdaltest.post_reason('fail')
print(data)
return 'fail'
return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:58,代码来源:vsiswift.py
示例13: test_visoss_7
def test_visoss_7():
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_bucket_test_mkdir/dir/', 404, {'Connection': 'close'})
handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir%2F', 404, {'Connection': 'close'})
handler.add('PUT', '/oss_bucket_test_mkdir/dir/', 200)
with webserver.install_http_handler(handler):
ret = gdal.Mkdir('/vsioss/oss_bucket_test_mkdir/dir', 0)
assert ret == 0
# Try creating already existing directory
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_bucket_test_mkdir/dir/', 416)
with webserver.install_http_handler(handler):
ret = gdal.Mkdir('/vsioss/oss_bucket_test_mkdir/dir', 0)
assert ret != 0
handler = webserver.SequentialHandler()
handler.add('DELETE', '/oss_bucket_test_mkdir/dir/', 204)
with webserver.install_http_handler(handler):
ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir')
assert ret == 0
# Try deleting already deleted directory
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_bucket_test_mkdir/dir/', 404)
handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir%2F', 404, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir')
assert ret != 0
# Try deleting non-empty directory
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_bucket_test_mkdir/dir_nonempty/', 416)
handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir_nonempty%2F', 200,
{'Content-type': 'application/xml'},
"""<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult>
<Prefix>dir_nonempty/</Prefix>
<Contents>
<Key>dir_nonempty/test.txt</Key>
<LastModified>1970-01-01T00:00:01.000Z</LastModified>
<Size>40</Size>
</Contents>
</ListBucketResult>
""")
with webserver.install_http_handler(handler):
ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir_nonempty')
assert ret != 0
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:52,代码来源:vsioss.py
示例14: test_vsiwebhdfs_open
def test_vsiwebhdfs_open():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
# Download without redirect (not nominal)
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=9999990784&length=16384', 200,
{}, '0123456789data')
with webserver.install_http_handler(handler):
f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
assert f is not None
gdal.VSIFSeekL(f, 9999990784 + 10, 0)
assert gdal.VSIFReadL(1, 4, f).decode('ascii') == 'data'
gdal.VSIFCloseL(f)
# Download with redirect (nominal) and permissions
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384&user.name=root&delegation=token', 307,
{'Location': gdaltest.webhdfs_redirected_url + '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384'})
handler.add('GET', '/redirected/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384', 200,
{}, 'yeah')
with gdaltest.config_options({'WEBHDFS_USERNAME': 'root',
'WEBHDFS_DELEGATION': 'token',
'WEBHDFS_DATANODE_HOST': 'localhost'}):
with webserver.install_http_handler(handler):
f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
assert f is not None
assert gdal.VSIFReadL(1, 4, f).decode('ascii') == 'yeah'
gdal.VSIFCloseL(f)
# Test error
gdal.VSICurlClearCache()
f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
assert f is not None
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384', 404)
with webserver.install_http_handler(handler):
assert len(gdal.VSIFReadL(1, 4, f)) == 0
# Retry: shouldn't not cause network access
assert len(gdal.VSIFReadL(1, 4, f)) == 0
gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:52,代码来源:vsiwebhdfs.py
示例15: vsicurl_test_retry
def vsicurl_test_retry():
if gdaltest.webserver_port == 0:
return 'skip'
handler = webserver.SequentialHandler()
handler.add('GET', '/test_retry/', 404)
handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
handler.add('GET', '/test_retry/test.txt', 502)
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL('/vsicurl/http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
data_len = 0
if f:
data_len = len(gdal.VSIFReadL(1, 1, f))
gdal.VSIFCloseL(f)
if data_len != 0:
gdaltest.post_reason('fail')
print(data_len)
return 'fail'
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/test_retry/', 404)
handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
handler.add('GET', '/test_retry/test.txt', 502)
handler.add('GET', '/test_retry/test.txt', 429)
handler.add('GET', '/test_retry/test.txt', 200, {}, 'foo')
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL('/vsicurl?max_retry=2&retry_delay=0.01&url=http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
if f is None:
gdaltest.post_reason('fail')
return 'fail'
gdal.ErrorReset()
with gdaltest.error_handler():
data = gdal.VSIFReadL(1, 3, f).decode('ascii')
error_msg = gdal.GetLastErrorMsg()
gdal.VSIFCloseL(f)
if data != 'foo':
gdaltest.post_reason('fail')
print(data)
return 'fail'
if error_msg.find('429') < 0:
gdaltest.post_reason('fail')
print(error_msg)
return 'fail'
return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:48,代码来源:vsicurl.py
示例16: test_vsiaz_write_blockblob_retry
def test_vsiaz_write_blockblob_retry():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
# Test creation of BlockBob
f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
assert f is not None
with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01'}):
handler = webserver.SequentialHandler()
def method(request):
request.protocol_version = 'HTTP/1.1'
request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
content = request.rfile.read(3).decode('ascii')
if len(content) != 3:
sys.stderr.write('Bad headers: %s\n' % str(request.headers))
request.send_response(403)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(201)
request.send_header('Content-Length', 0)
request.end_headers()
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', custom_method=method)
with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('foo', 1, 3, f) == 3
gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:35,代码来源:vsiaz.py
示例17: test_vsiaz_write_appendblob_retry
def test_vsiaz_write_appendblob_retry():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01',
'VSIAZ_CHUNK_SIZE_BYTES': '10'}):
f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
assert f is not None
handler = webserver.SequentialHandler()
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('0123456789abcdef', 1, 16, f) == 16
gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:25,代码来源:vsiaz.py
示例18: vsiwebhdfs_readdir
def vsiwebhdfs_readdir():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
{}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
if dir_contents != ['bar.baz', 'mysubdir']:
gdaltest.post_reason('fail')
print(dir_contents)
return 'fail'
stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
if stat_res.size != 123456:
gdaltest.post_reason('fail')
print(stat_res.size)
return 'fail'
if stat_res.mtime != 1:
gdaltest.post_reason('fail')
return 'fail'
# ReadDir on something known to be a file shouldn't cause network access
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + '/foo/bar.baz')
if dir_contents is not None:
gdaltest.post_reason('fail')
return 'fail'
# Test error on ReadDir()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + 'foo/error_test/')
if dir_contents is not None:
gdaltest.post_reason('fail')
print(dir_contents)
return 'fail'
return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:44,代码来源:vsiwebhdfs.py
示例19: vsiswift_stat
def vsiswift_stat():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
{'Content-Range': 'bytes 0-0/1000000'}, 'x')
with webserver.install_http_handler(handler):
stat_res = gdal.VSIStatL('/vsiswift/foo/bar')
if stat_res is None or stat_res.size != 1000000:
gdaltest.post_reason('fail')
if stat_res is not None:
print(stat_res.size)
else:
print(stat_res)
return 'fail'
handler = webserver.SequentialHandler()
handler.add('HEAD', '/v1/AUTH_something/foo/bar', 200, {'Content-Length': '1000000'})
with webserver.install_http_handler(handler):
stat_res = gdal.VSIStatL('/vsiswift_streaming/foo/bar')
if stat_res is None or stat_res.size != 1000000:
gdaltest.post_reason('fail')
if stat_res is not None:
print(stat_res.size)
else:
print(stat_res)
return 'fail'
# Test stat on container
handler = webserver.SequentialHandler()
# GET on the container URL returns something, but we must hack this back
# to a directory
handler.add('GET', '/v1/AUTH_something/foo', 200, {}, "blabla")
with webserver.install_http_handler(handler):
stat_res = gdal.VSIStatL('/vsiswift/foo')
if stat_res is None or not stat.S_ISDIR(stat_res.mode):
gdaltest.post_reason('fail')
return 'fail'
return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:44,代码来源:vsiswift.py
示例20: vsiswift_fake_write
def vsiswift_fake_write():
if gdaltest.webserver_port == 0:
return 'skip'
gdal.VSICurlClearCache()
# Test creation of BlockBob
f = gdal.VSIFOpenL('/vsiswift/test_copy/file.bin', 'wb')
if f is None:
gdaltest.post_reason('fail')
return 'fail'
handler = webserver.SequentialHandler()
def method(request):
h = request.headers
if 'x-auth-token' not in h or \
h['x-auth-token'] != 'my_auth_token' or \
'Transfer-Encoding' not in h or h['Transfer-Encoding'] != 'chunked':
sys.stderr.write('Bad headers: %s\n' % str(h))
request.send_response(403)
return
request.protocol_version = 'HTTP/1.1'
request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
content = ''
while True:
numchars = int(request.rfile.readline().strip(), 16)
content += request.rfile.read(numchars).decode('ascii')
request.rfile.read(2)
if numchars == 0:
break
if len(content) != 40000:
sys.stderr.write('Bad headers: %s\n' % str(request.headers))
request.send_response(403)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()
handler.add('PUT', '/v1/AUTH_something/test_copy/file.bin', custom_method=method)
with webserver.install_http_handler(handler):
ret = gdal.VSIFWriteL('x' * 35000, 1, 35000, f)
ret += gdal.VSIFWriteL('x' * 5000, 1, 5000, f)
if ret != 40000:
gdaltest.post_reason('fail')
print(ret)
gdal.VSIFCloseL(f)
return 'fail'
gdal.VSIFCloseL(f)
return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:55,代码来源:vsiswift.py
注:本文中的webserver.install_http_handler函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论