• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python webserver.install_http_handler函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中webserver.install_http_handler函数的典型用法代码示例。如果您正苦于以下问题:Python install_http_handler函数的具体用法?Python install_http_handler怎么用?Python install_http_handler使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了install_http_handler函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_visoss_8

def test_visoss_8():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/visoss_8/?delimiter=%2F', 200,
                {'Content-type': 'application/xml'},
                """<?xml version="1.0" encoding="UTF-8"?>
                    <ListBucketResult>
                        <Prefix></Prefix>
                        <Contents>
                            <Key>test</Key>
                            <LastModified>1970-01-01T00:00:01.000Z</LastModified>
                            <Size>40</Size>
                        </Contents>
                        <CommonPrefixes>
                            <Prefix>test/</Prefix>
                        </CommonPrefixes>
                    </ListBucketResult>
                """)

    with webserver.install_http_handler(handler):
        listdir = gdal.ReadDir('/vsioss/visoss_8', 0)
    assert listdir == ['test', 'test/']

    handler = webserver.SequentialHandler()
    with webserver.install_http_handler(handler):
        assert not stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test').mode)

    handler = webserver.SequentialHandler()
    with webserver.install_http_handler(handler):
        assert stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test/').mode)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:33,代码来源:vsioss.py


示例2: test_vsiwebhdfs_stat

def test_vsiwebhdfs_stat():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/bar?op=GETFILESTATUS', 200,
                {}, '{"FileStatus":{"type":"FILE","length":1000000}}')
    with webserver.install_http_handler(handler):
        stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar')
    if stat_res is None or stat_res.size != 1000000:
        if stat_res is not None:
            print(stat_res.size)
        else:
            print(stat_res)
        pytest.fail()

    # Test caching
    stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar')
    assert stat_res.size == 1000000

    # Test missing file
    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/unexisting?op=GETFILESTATUS', 404, {},
                '{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File does not exist: /unexisting"}}')
    with webserver.install_http_handler(handler):
        stat_res = gdal.VSIStatL(
            gdaltest.webhdfs_base_connection + '/unexisting')
    assert stat_res is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:31,代码来源:vsiwebhdfs.py


示例3: test_ogr_wfs3_get_feature_count

def test_ogr_wfs3_get_feature_count():
    if gdaltest.wfs3_drv is None:
        pytest.skip()

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/wfs3/collections', 200, {'Content-Type': 'application/json'},
                """{ "collections" : [ {
                    "name": "foo",
                    "extent": {
                        "spatial": [ -10, 40, 15, 50 ]
                    }
                 }] }""")
    with webserver.install_http_handler(handler):
        ds = ogr.Open('WFS3:http://localhost:%d/wfs3' % gdaltest.webserver_port)
    lyr = ds.GetLayer(0)

    handler = webserver.SequentialHandler()
    # Fake openapi response
    handler.add('GET', '/wfs3/api', 200, {'Content-Type': 'application/json'},
                """{
            "openapi": "3.0.0",
            "paths" : {
                "/collections/foo/items": {
                    "get": {
                        "parameters": [
                            {
                                "$ref" : "#/components/parameters/resultType"
                            }
                        ]
                    }
                }
            },
            "components": {
                "parameters": {
                    "resultType": {
                        "name": "resultType",
                        "in": "query",
                        "schema" : {
                            "type" : "string",
                            "enum" : [ "hits", "results" ]
                        }
                    }
                }
            }
        }""")
    handler.add('GET', '/wfs3/collections/foo/items?resultType=hits', 200,
                {'Content-Type': 'application/json'},
                '{ "numberMatched": 1234 }')
    with webserver.install_http_handler(handler):
        assert lyr.GetFeatureCount() == 1234

    handler = webserver.SequentialHandler()
    handler.add('GET', '/wfs3/collections/foo/items?resultType=hits', 200,
                {'Content-Type': 'application/json'},
                '{ "numberMatched": 1234 }')
    with webserver.install_http_handler(handler):
        assert lyr.GetFeatureCount() == 1234
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:60,代码来源:ogr_wfs3.py


示例4: vsiaz_fake_unlink

def vsiaz_fake_unlink():

    if gdaltest.webserver_port == 0:
        return 'skip'

    # Success
    handler = webserver.SequentialHandler()
    handler.add('HEAD', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 200, {'Content-Length': '1'})
    handler.add('DELETE', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 202, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        ret = gdal.Unlink('/vsiaz/az_bucket_test_unlink/myfile')
    if ret != 0:
        gdaltest.post_reason('fail')
        return 'fail'

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('HEAD', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 200, {'Content-Length': '1'})
    handler.add('DELETE', '/azure/blob/myaccount/az_bucket_test_unlink/myfile', 400, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        with gdaltest.error_handler():
            ret = gdal.Unlink('/vsiaz/az_bucket_test_unlink/myfile')
    if ret != -1:
        gdaltest.post_reason('fail')
        return 'fail'

    return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:27,代码来源:vsiaz.py


示例5: test_vsicurl_test_retry

def test_vsicurl_test_retry():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/test_retry/', 404)
    handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
    handler.add('GET', '/test_retry/test.txt', 502)
    with webserver.install_http_handler(handler):
        f = gdal.VSIFOpenL('/vsicurl/http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
        data_len = 0
        if f:
            data_len = len(gdal.VSIFReadL(1, 1, f))
            gdal.VSIFCloseL(f)
        assert data_len == 0

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/test_retry/', 404)
    handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
    handler.add('GET', '/test_retry/test.txt', 502)
    handler.add('GET', '/test_retry/test.txt', 429)
    handler.add('GET', '/test_retry/test.txt', 200, {}, 'foo')
    with webserver.install_http_handler(handler):
        f = gdal.VSIFOpenL('/vsicurl?max_retry=2&retry_delay=0.01&url=http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
        assert f is not None
        gdal.ErrorReset()
        with gdaltest.error_handler():
            data = gdal.VSIFReadL(1, 3, f).decode('ascii')
        error_msg = gdal.GetLastErrorMsg()
        gdal.VSIFCloseL(f)
        assert data == 'foo'
        assert '429' in error_msg
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:35,代码来源:vsicurl.py


示例6: test_vsiswift_fake_unlink

def test_vsiswift_fake_unlink():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    # Success
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
                {'Content-Range': 'bytes 0-0/1'}, 'x')
    handler.add('DELETE', '/v1/AUTH_something/foo/bar', 202, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        ret = gdal.Unlink('/vsiswift/foo/bar')
    assert ret == 0

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
                {'Content-Range': 'bytes 0-0/1'}, 'x')
    handler.add('DELETE', '/v1/AUTH_something/foo/bar', 400, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        with gdaltest.error_handler():
            ret = gdal.Unlink('/vsiswift/foo/bar')
    assert ret == -1
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:25,代码来源:vsiswift.py


示例7: vsiwebhdfs_unlink

def vsiwebhdfs_unlink():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()

    # Success
    handler = webserver.SequentialHandler()
    handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 200,
                {}, '{"boolean":true}')
    with webserver.install_http_handler(handler):
        ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
    if ret != 0:
        gdaltest.post_reason('fail')
        return 'fail'

    gdal.VSICurlClearCache()


    # With permissions

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE&user.name=root&delegation=token', 200,
                {}, '{"boolean":true}')
    with gdaltest.config_options({'WEBHDFS_USERNAME': 'root',
                                  'WEBHDFS_DELEGATION': 'token'}):
        with webserver.install_http_handler(handler):
            ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
        if ret != 0:
            gdaltest.post_reason('fail')
            return 'fail'

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 200,
                {}, '{"boolean":false}')
    with webserver.install_http_handler(handler):
        with gdaltest.error_handler():
            ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
    if ret != -1:
        gdaltest.post_reason('fail')
        return 'fail'

    gdal.VSICurlClearCache()

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('DELETE', '/webhdfs/v1/foo/bar?op=DELETE', 404,
                {})
    with webserver.install_http_handler(handler):
        with gdaltest.error_handler():
            ret = gdal.Unlink(gdaltest.webhdfs_base_connection + '/foo/bar')
    if ret != -1:
        gdaltest.post_reason('fail')
        return 'fail'

    return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:60,代码来源:vsiwebhdfs.py


示例8: vsiswift_fake_unlink

def vsiswift_fake_unlink():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()

    # Success
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
                {'Content-Range': 'bytes 0-0/1'}, 'x')
    handler.add('DELETE', '/v1/AUTH_something/foo/bar', 202, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        ret = gdal.Unlink('/vsiswift/foo/bar')
    if ret != 0:
        gdaltest.post_reason('fail')
        return 'fail'

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
                {'Content-Range': 'bytes 0-0/1'}, 'x')
    handler.add('DELETE', '/v1/AUTH_something/foo/bar', 400, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        with gdaltest.error_handler():
            ret = gdal.Unlink('/vsiswift/foo/bar')
    if ret != -1:
        gdaltest.post_reason('fail')
        return 'fail'

    return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:31,代码来源:vsiswift.py


示例9: test_vsicurl_test_parse_html_filelist_apache

def test_vsicurl_test_parse_html_filelist_apache():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/mydir/', 200, {}, """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
 <head>
  <title>Index of /mydir</title>
 </head>
 <body>
<h1>Index of /mydir</h1>
<table><tr><th><img src="/icons/blank.gif" alt="[ICO]"></th><th><a href="?C=N;O=D">Name</a></th><th><a href="?C=M;O=A">Last modified</a></th><th><a href="?C=S;O=A">Size</a></th><th><a href="?C=D;O=A">Description</a></th></tr><tr><th colspan="5"><hr></th></tr>
<tr><td valign="top"><img src="/icons/back.gif" alt="[DIR]"></td><td><a href="/gdal/data/">Parent Directory</a></td><td>&nbsp;</td><td align="right">  - </td><td>&nbsp;</td></tr>
<tr><td valign="top"><img src="/icons/image2.gif" alt="[IMG]"></td><td><a href="foo.tif">foo.tif</a></td><td align="right">17-May-2010 12:26  </td><td align="right"> 90K</td><td>&nbsp;</td></tr>
<tr><td valign="top"><img src="/icons/image2.gif" alt="[IMG]"></td><td><a href="foo%20with%20space.tif">foo with space.tif</a></td><td align="right">15-Jan-2007 11:02  </td><td align="right">736 </td><td>&nbsp;</td></tr>
<tr><th colspan="5"><hr></th></tr>
</table>
</body></html>""")
    with webserver.install_http_handler(handler):
        fl = gdal.ReadDir('/vsicurl/http://localhost:%d/mydir' % gdaltest.webserver_port)
    assert fl == ['foo.tif', 'foo%20with%20space.tif']

    assert gdal.VSIStatL('/vsicurl/http://localhost:%d/mydir/foo%%20with%%20space.tif' % gdaltest.webserver_port, gdal.VSI_STAT_EXISTS_FLAG) is not None

    handler = webserver.SequentialHandler()
    handler.add('HEAD', '/mydir/i_dont_exist', 404, {})
    with webserver.install_http_handler(handler):
        assert gdal.VSIStatL('/vsicurl/http://localhost:%d/mydir/i_dont_exist' % gdaltest.webserver_port, gdal.VSI_STAT_EXISTS_FLAG) is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:30,代码来源:vsicurl.py


示例10: ogr_wfs3_empty_layer

def ogr_wfs3_empty_layer():
    if gdaltest.wfs3_drv is None:
        return 'skip'

    if gdaltest.webserver_port == 0:
        return 'skip'

    handler = webserver.SequentialHandler()
    handler.add('GET', '/wfs3/collections', 200,
                {'Content-Type': 'application/json'},
                '{ "collections" : [ { "name": "foo" }] }')
    with webserver.install_http_handler(handler):
        ds = ogr.Open('WFS3:http://localhost:%d/wfs3' % gdaltest.webserver_port)
    if ds is None:
        gdaltest.post_reason('fail')
        return 'fail'
    if ds.GetLayerCount() != 1:
        gdaltest.post_reason('fail')
        return 'fail'
    lyr = ds.GetLayer(0)
    if lyr.GetName() != 'foo':
        gdaltest.post_reason('fail')
        return 'fail'

    handler = webserver.SequentialHandler()
    handler.add('GET', '/wfs3/collections/foo/items?limit=10', 200,
                {'Content-Type': 'application/geo+json'},
                '{ "type": "FeatureCollection", "features": [] }')
    with webserver.install_http_handler(handler):
        if lyr.GetLayerDefn().GetFieldCount() != 0:
            gdaltest.post_reason('fail')
            return 'fail'

    return 'success'
开发者ID:normanb,项目名称:gdal,代码行数:34,代码来源:ogr_wfs3.py


示例11: test_vsiwebhdfs_readdir

def test_vsiwebhdfs_readdir():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
                {}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
    assert dir_contents == ['bar.baz', 'mysubdir']
    stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    assert stat_res.size == 123456
    assert stat_res.mtime == 1

    # ReadDir on something known to be a file shouldn't cause network access
    dir_contents = gdal.ReadDir(
        gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    assert dir_contents is None

    # Test error on ReadDir()
    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(
            gdaltest.webhdfs_base_connection + 'foo/error_test/')
    assert dir_contents is None
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:29,代码来源:vsiwebhdfs.py


示例12: vsiswift_fake_auth_storage_url_and_auth_token

def vsiswift_fake_auth_storage_url_and_auth_token():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()
    gdal.SetConfigOption('SWIFT_AUTH_V1_URL', '')
    gdal.SetConfigOption('SWIFT_USER', '')
    gdal.SetConfigOption('SWIFT_KEY', '')
    gdal.SetConfigOption('SWIFT_STORAGE_URL', 'http://127.0.0.1:%d/v1/AUTH_something' % gdaltest.webserver_port)
    gdal.SetConfigOption('SWIFT_AUTH_TOKEN', 'my_auth_token')

    # Failure
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 501)
    with webserver.install_http_handler(handler):
        f = open_for_read('/vsiswift/foo/bar')
        if f is None:
            gdaltest.post_reason('fail')
            return 'fail'
        gdal.VSIFReadL(1, 4, f).decode('ascii')
        gdal.VSIFCloseL(f)

    gdal.VSICurlClearCache()

    # Success
    def method(request):

        request.protocol_version = 'HTTP/1.1'
        h = request.headers
        if 'x-auth-token' not in h or \
           h['x-auth-token'] != 'my_auth_token':
            sys.stderr.write('Bad headers: %s\n' % str(h))
            request.send_response(403)
            return
        request.send_response(200)
        request.send_header('Content-type', 'text/plain')
        request.send_header('Content-Length', 3)
        request.send_header('Connection', 'close')
        request.end_headers()
        request.wfile.write("""foo""".encode('ascii'))

    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', custom_method=method)
    with webserver.install_http_handler(handler):
        f = open_for_read('/vsiswift/foo/bar')
        if f is None:
            gdaltest.post_reason('fail')
            return 'fail'
        data = gdal.VSIFReadL(1, 4, f).decode('ascii')
        gdal.VSIFCloseL(f)

    if data != 'foo':
        gdaltest.post_reason('fail')
        print(data)
        return 'fail'

    return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:58,代码来源:vsiswift.py


示例13: test_visoss_7

def test_visoss_7():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_bucket_test_mkdir/dir/', 404, {'Connection': 'close'})
    handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir%2F', 404, {'Connection': 'close'})
    handler.add('PUT', '/oss_bucket_test_mkdir/dir/', 200)
    with webserver.install_http_handler(handler):
        ret = gdal.Mkdir('/vsioss/oss_bucket_test_mkdir/dir', 0)
    assert ret == 0

    # Try creating already existing directory
    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_bucket_test_mkdir/dir/', 416)
    with webserver.install_http_handler(handler):
        ret = gdal.Mkdir('/vsioss/oss_bucket_test_mkdir/dir', 0)
    assert ret != 0

    handler = webserver.SequentialHandler()
    handler.add('DELETE', '/oss_bucket_test_mkdir/dir/', 204)
    with webserver.install_http_handler(handler):
        ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir')
    assert ret == 0

    # Try deleting already deleted directory
    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_bucket_test_mkdir/dir/', 404)
    handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir%2F', 404, {'Connection': 'close'})
    with webserver.install_http_handler(handler):
        ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir')
    assert ret != 0

    # Try deleting non-empty directory
    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_bucket_test_mkdir/dir_nonempty/', 416)
    handler.add('GET', '/oss_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir_nonempty%2F', 200,
                {'Content-type': 'application/xml'},
                """<?xml version="1.0" encoding="UTF-8"?>
                    <ListBucketResult>
                        <Prefix>dir_nonempty/</Prefix>
                        <Contents>
                            <Key>dir_nonempty/test.txt</Key>
                            <LastModified>1970-01-01T00:00:01.000Z</LastModified>
                            <Size>40</Size>
                        </Contents>
                    </ListBucketResult>
                """)
    with webserver.install_http_handler(handler):
        ret = gdal.Rmdir('/vsioss/oss_bucket_test_mkdir/dir_nonempty')
    assert ret != 0
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:52,代码来源:vsioss.py


示例14: test_vsiwebhdfs_open

def test_vsiwebhdfs_open():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    # Download without redirect (not nominal)
    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=9999990784&length=16384', 200,
                {}, '0123456789data')
    with webserver.install_http_handler(handler):
        f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
        assert f is not None
        gdal.VSIFSeekL(f, 9999990784 + 10, 0)
        assert gdal.VSIFReadL(1, 4, f).decode('ascii') == 'data'
        gdal.VSIFCloseL(f)

    # Download with redirect (nominal) and permissions

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384&user.name=root&delegation=token', 307,
                {'Location': gdaltest.webhdfs_redirected_url + '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384'})
    handler.add('GET', '/redirected/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384', 200,
                {}, 'yeah')
    with gdaltest.config_options({'WEBHDFS_USERNAME': 'root',
                                  'WEBHDFS_DELEGATION': 'token',
                                  'WEBHDFS_DATANODE_HOST': 'localhost'}):
        with webserver.install_http_handler(handler):
            f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
            assert f is not None
            assert gdal.VSIFReadL(1, 4, f).decode('ascii') == 'yeah'
            gdal.VSIFCloseL(f)

    # Test error

    gdal.VSICurlClearCache()

    f = open_for_read(gdaltest.webhdfs_base_connection + '/foo/bar')
    assert f is not None

    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/bar?op=OPEN&offset=0&length=16384', 404)
    with webserver.install_http_handler(handler):
        assert len(gdal.VSIFReadL(1, 4, f)) == 0

    # Retry: shouldn't not cause network access
    assert len(gdal.VSIFReadL(1, 4, f)) == 0

    gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:52,代码来源:vsiwebhdfs.py


示例15: vsicurl_test_retry

def vsicurl_test_retry():

    if gdaltest.webserver_port == 0:
        return 'skip'

    handler = webserver.SequentialHandler()
    handler.add('GET', '/test_retry/', 404)
    handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
    handler.add('GET', '/test_retry/test.txt', 502)
    with webserver.install_http_handler(handler):
        f = gdal.VSIFOpenL('/vsicurl/http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
        data_len = 0
        if f:
            data_len = len(gdal.VSIFReadL(1, 1, f))
            gdal.VSIFCloseL(f)
        if data_len != 0:
            gdaltest.post_reason('fail')
            print(data_len)
            return 'fail'

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/test_retry/', 404)
    handler.add('HEAD', '/test_retry/test.txt', 200, {'Content-Length': '3'})
    handler.add('GET', '/test_retry/test.txt', 502)
    handler.add('GET', '/test_retry/test.txt', 429)
    handler.add('GET', '/test_retry/test.txt', 200, {}, 'foo')
    with webserver.install_http_handler(handler):
        f = gdal.VSIFOpenL('/vsicurl?max_retry=2&retry_delay=0.01&url=http://localhost:%d/test_retry/test.txt' % gdaltest.webserver_port, 'rb')
        if f is None:
            gdaltest.post_reason('fail')
            return 'fail'
        gdal.ErrorReset()
        with gdaltest.error_handler():
            data = gdal.VSIFReadL(1, 3, f).decode('ascii')
        error_msg = gdal.GetLastErrorMsg()
        gdal.VSIFCloseL(f)
        if data != 'foo':
            gdaltest.post_reason('fail')
            print(data)
            return 'fail'
        if error_msg.find('429') < 0:
            gdaltest.post_reason('fail')
            print(error_msg)
            return 'fail'

    return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:48,代码来源:vsicurl.py


示例16: test_vsiaz_write_blockblob_retry

def test_vsiaz_write_blockblob_retry():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    # Test creation of BlockBob
    f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
    assert f is not None

    with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
                                  'GDAL_HTTP_RETRY_DELAY': '0.01'}):

        handler = webserver.SequentialHandler()

        def method(request):
            request.protocol_version = 'HTTP/1.1'
            request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
            content = request.rfile.read(3).decode('ascii')
            if len(content) != 3:
                sys.stderr.write('Bad headers: %s\n' % str(request.headers))
                request.send_response(403)
                request.send_header('Content-Length', 0)
                request.end_headers()
                return
            request.send_response(201)
            request.send_header('Content-Length', 0)
            request.end_headers()

        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', custom_method=method)
        with webserver.install_http_handler(handler):
            assert gdal.VSIFWriteL('foo', 1, 3, f) == 3
            gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:35,代码来源:vsiaz.py


示例17: test_vsiaz_write_appendblob_retry

def test_vsiaz_write_appendblob_retry():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
                                  'GDAL_HTTP_RETRY_DELAY': '0.01',
                                  'VSIAZ_CHUNK_SIZE_BYTES': '10'}):

        f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
        assert f is not None

        handler = webserver.SequentialHandler()
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 201)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)

        with webserver.install_http_handler(handler):
            assert gdal.VSIFWriteL('0123456789abcdef', 1, 16, f) == 16
            gdal.VSIFCloseL(f)
开发者ID:AsgerPetersen,项目名称:gdal,代码行数:25,代码来源:vsiaz.py


示例18: vsiwebhdfs_readdir

def vsiwebhdfs_readdir():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
                {}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
    if dir_contents != ['bar.baz', 'mysubdir']:
        gdaltest.post_reason('fail')
        print(dir_contents)
        return 'fail'
    stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    if stat_res.size != 123456:
        gdaltest.post_reason('fail')
        print(stat_res.size)
        return 'fail'
    if stat_res.mtime != 1:
        gdaltest.post_reason('fail')
        return 'fail'

    # ReadDir on something known to be a file shouldn't cause network access
    dir_contents = gdal.ReadDir(
        gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    if dir_contents is not None:
        gdaltest.post_reason('fail')
        return 'fail'

    # Test error on ReadDir()
    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(
            gdaltest.webhdfs_base_connection + 'foo/error_test/')
    if dir_contents is not None:
        gdaltest.post_reason('fail')
        print(dir_contents)
        return 'fail'

    return 'success'
开发者ID:hdfeos,项目名称:gdal,代码行数:44,代码来源:vsiwebhdfs.py


示例19: vsiswift_stat

def vsiswift_stat():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/bar', 206,
                {'Content-Range': 'bytes 0-0/1000000'}, 'x')
    with webserver.install_http_handler(handler):
        stat_res = gdal.VSIStatL('/vsiswift/foo/bar')
        if stat_res is None or stat_res.size != 1000000:
            gdaltest.post_reason('fail')
            if stat_res is not None:
                print(stat_res.size)
            else:
                print(stat_res)
            return 'fail'

    handler = webserver.SequentialHandler()
    handler.add('HEAD', '/v1/AUTH_something/foo/bar', 200, {'Content-Length': '1000000'})
    with webserver.install_http_handler(handler):
        stat_res = gdal.VSIStatL('/vsiswift_streaming/foo/bar')
        if stat_res is None or stat_res.size != 1000000:
            gdaltest.post_reason('fail')
            if stat_res is not None:
                print(stat_res.size)
            else:
                print(stat_res)
            return 'fail'

    # Test stat on container
    handler = webserver.SequentialHandler()
    # GET on the container URL returns something, but we must hack this back
    # to a directory
    handler.add('GET', '/v1/AUTH_something/foo', 200, {}, "blabla")
    with webserver.install_http_handler(handler):
        stat_res = gdal.VSIStatL('/vsiswift/foo')
        if stat_res is None or not stat.S_ISDIR(stat_res.mode):
            gdaltest.post_reason('fail')
            return 'fail'

    return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:44,代码来源:vsiswift.py


示例20: vsiswift_fake_write

def vsiswift_fake_write():

    if gdaltest.webserver_port == 0:
        return 'skip'

    gdal.VSICurlClearCache()

    # Test creation of BlockBob
    f = gdal.VSIFOpenL('/vsiswift/test_copy/file.bin', 'wb')
    if f is None:
        gdaltest.post_reason('fail')
        return 'fail'

    handler = webserver.SequentialHandler()

    def method(request):
        h = request.headers
        if 'x-auth-token' not in h or \
           h['x-auth-token'] != 'my_auth_token' or \
           'Transfer-Encoding' not in h or h['Transfer-Encoding'] != 'chunked':
            sys.stderr.write('Bad headers: %s\n' % str(h))
            request.send_response(403)
            return

        request.protocol_version = 'HTTP/1.1'
        request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
        content = ''
        while True:
            numchars = int(request.rfile.readline().strip(), 16)
            content += request.rfile.read(numchars).decode('ascii')
            request.rfile.read(2)
            if numchars == 0:
                break
        if len(content) != 40000:
            sys.stderr.write('Bad headers: %s\n' % str(request.headers))
            request.send_response(403)
            request.send_header('Content-Length', 0)
            request.end_headers()
            return
        request.send_response(200)
        request.send_header('Content-Length', 0)
        request.end_headers()

    handler.add('PUT', '/v1/AUTH_something/test_copy/file.bin', custom_method=method)
    with webserver.install_http_handler(handler):
        ret = gdal.VSIFWriteL('x' * 35000, 1, 35000, f)
        ret += gdal.VSIFWriteL('x' * 5000, 1, 5000, f)
        if ret != 40000:
            gdaltest.post_reason('fail')
            print(ret)
            gdal.VSIFCloseL(f)
            return 'fail'
        gdal.VSIFCloseL(f)

    return 'success'
开发者ID:sgillies,项目名称:gdal,代码行数:55,代码来源:vsiswift.py



注:本文中的webserver.install_http_handler函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.url_for函数代码示例发布时间:2022-05-26
下一篇:
Python time.now函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap