• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python encoder.MultipartEncoder类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests_toolbelt.multipart.encoder.MultipartEncoder的典型用法代码示例。如果您正苦于以下问题:Python MultipartEncoder类的具体用法?Python MultipartEncoder怎么用?Python MultipartEncoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MultipartEncoder类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: upload_document

    def upload_document(self, browser, raw_token, payload, document, new_file):
        with self.as_officeconnector(browser):
            encoder = MultipartEncoder({
                'form.widgets.file.action': 'replace',
                'form.buttons.upload': 'oc-file-upload',
                'form.widgets.file': (
                    basename(new_file.name),
                    new_file,
                    payload.get('content-type'),
                    ),
                '_authenticator': payload.get('csrf-token'),
                })

            headers = {
                'Authorization': ' '.join(('Bearer', raw_token, )),
                'Content-Type': encoder.content_type,
            }
            browser.open(
                document,
                view=payload.get('upload-form'),
                method='POST',
                headers=headers,
                data=encoder.to_string(),
                )

            self.assertEquals(204, browser.status_code)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:26,代码来源:testing.py


示例2: check_read_file_with_chunks

    def check_read_file_with_chunks(self, file_size, read_size):
        #print "===== Testing with file_size=",file_size,"read_size=",read_size
        boundary="deterministic-test-boundary"
        a_file = LargeFileMock(file_size)
        parts = {'some_field': 'this is the value...',
                 'some_file': a_file.read(),
        }
        expected_bytes = encode_multipart_formdata(parts, boundary)[0]
        content_length = len(expected_bytes)

        # Now read from our encoder :
        a_file = LargeFileMock(file_size)
        parts = {'some_field': 'this is the value...',
                 'some_file': a_file,
        }
        encoder = MultipartEncoder(parts, boundary=boundary)
        raw_bytes_count = 0
        while True:
            data = encoder.read(read_size)
            if not data:
                break
            #print "read",len(data),"bytes : ",repr(data)
            assert data == expected_bytes[raw_bytes_count:raw_bytes_count+len(data)]
            raw_bytes_count += len(data)
        #if raw_bytes_count != content_length:
        #    print "Test failed with file_size=",file_size,"and read_size=",read_size
        assert raw_bytes_count == content_length
开发者ID:netheosgithub,项目名称:requests-toolbelt,代码行数:27,代码来源:test_multipart_encoder.py


示例3: login

def login(username, password):
    url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
    headers = getheaders()
    checkcapthca(s, headers)
    data = getdata(username, password)
    encoder = MultipartEncoder(data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
    headers['Content-Type'] = encoder.content_type
    z2 = s.post(url, headers=headers, data=encoder.to_string())
    print(z2.json())
开发者ID:chennanxu,项目名称:crawler,代码行数:9,代码来源:zhihulogin.py


示例4: test_encodes_with_readable_data

 def test_encodes_with_readable_data(self):
     s = io.BytesIO(b'value')
     m = MultipartEncoder([('field', s)], boundary=self.boundary)
     assert m.read() == (
         '--this-is-a-boundary\r\n'
         'Content-Disposition: form-data; name="field"\r\n\r\n'
         'value\r\n'
         '--this-is-a-boundary--\r\n'
     ).encode()
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py


示例5: test_reads_file_from_url_wrapper

 def test_reads_file_from_url_wrapper(self):
     s = requests.Session()
     recorder = get_betamax(s)
     url = ('https://stxnext.com/static/img/logo.830ebe551641.svg')
     with recorder.use_cassette(
             'file_for_download'):
         m = MultipartEncoder(
             [('field', 'foo'), ('file', FileFromURLWrapper(url))])
     assert m.read() is not None
开发者ID:davidfischer,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py


示例6: test_handles_empty_unicode_values

    def test_handles_empty_unicode_values(self):
        """Verify that the Encoder can handle empty unicode strings.

        See https://github.com/requests/toolbelt/issues/46 for
        more context.
        """
        fields = [(b'test'.decode('utf-8'), b''.decode('utf-8'))]
        m = MultipartEncoder(fields=fields)
        assert len(m.read()) > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py


示例7: do_callback_request

    def do_callback_request(self, browser, fields):
        meeting = self.meeting
        encoder = MultipartEncoder(fields=fields)
        data = encoder.to_string()
        headers = {'Content-Type': encoder.content_type}

        with self.logout():
            browser.open(
                meeting, view='receive_meeting_zip_pdf',
                method='POST', data=data, headers=headers)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:10,代码来源:test_meeting_zipexport_callback.py


示例8: login

def login(username, password):
    url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
    headers = getheaders()
    data = getdata(username, password)
    checkcapthca(headers)
    # multipart_encoder = MultipartEncoder(fieles=data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
    # todo:boundary后面的几位数可以随机,现在是固定的
    encoder = MultipartEncoder(data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
    headers['Content-Type'] = encoder.content_type
    z2 = s.post(url, headers=headers, data=encoder.to_string(), )
    print(z2.json())
    print('123')
开发者ID:q7695650,项目名称:pachong,代码行数:12,代码来源:zhihulogin.py


示例9: test_upload

    def test_upload(self):
        self.login()
        manager = PluginRegistry.getInstance("UploadManager")
        fpath = os.path.join(os.path.dirname(__file__), 'create_user.zip')

        with open(fpath, "rb") as f:
            m = MultipartEncoder(
                fields={'file': ('create_user.zip', f, 'text/plain')}
            )
            data = m.to_string()

            # try to use unregistered path
            uuid, path = manager.registerUploadPath("admin", self.session_id, "workflow")
            response = self.fetch("/uploads/unknown_path", method="POST", body=data, headers={
                'Content-Type': m.content_type
            })
            assert response.code == 404
            assert manager.unregisterUploadPath(uuid) is True

            # try to use path from another user
            uuid, path = manager.registerUploadPath("other_user", self.session_id, "workflow")
            response = self.fetch(path, method="POST", body=data, headers={
                'Content-Type': m.content_type
            })
            assert response.code == 403
            assert manager.unregisterUploadPath(uuid) is True

            # try to use path from another session
            uuid, path = manager.registerUploadPath("admin", "other session id", "workflow")
            response = self.fetch(path, method="POST", body=data, headers={
                'Content-Type': m.content_type
            })
            assert response.code == 403
            assert manager.unregisterUploadPath(uuid) is True

            # try to use path for unhandled type
            uuid, path = manager.registerUploadPath("admin", self.session_id, "unknown-type")
            response = self.fetch(path, method="POST", body=data, headers={
                'Content-Type': m.content_type
            })
            assert response.code == 501
            assert manager.unregisterUploadPath(uuid) is True

            # finally a working example
            uuid, path = manager.registerUploadPath("admin", self.session_id, "workflow")
            response = self.fetch(path, method="POST", body=data, headers={
                'Content-Type': m.content_type,
                'X-File-Name': 'create_user.zip'
            })
            assert response.code == 200
            # path should have been removed by successfully unsigning it
            assert manager.unregisterUploadPath(uuid) is False
开发者ID:gonicus,项目名称:gosa,代码行数:52,代码来源:test_main.py


示例10: test_accepts_custom_content_type

    def test_accepts_custom_content_type(self):
        """Verify that the Encoder handles custom content-types.

        See https://github.com/requests/toolbelt/issues/52
        """
        fields = [
            (b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
                                       b'filecontent',
                                       b'application/json'.decode('utf-8')))
        ]
        m = MultipartEncoder(fields=fields)
        output = m.read().decode('utf-8')
        assert output.index('Content-Type: application/json\r\n') > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:13,代码来源:test_multipart_encoder.py


示例11: test_streams_its_data

    def test_streams_its_data(self):
        large_file = LargeFileMock()
        parts = {'some field': 'value',
                 'some file': large_file,
                 }
        encoder = MultipartEncoder(parts)
        read_size = 1024 * 1024 * 128
        while True:
            read = encoder.read(read_size)
            if not read:
                break

        assert encoder._buffer.tell() <= read_size
开发者ID:Zearin,项目名称:python-requests-toolbelt,代码行数:13,代码来源:test_multipart_encoder.py


示例12: test_accepts_custom_headers

    def test_accepts_custom_headers(self):
        """Verify that the Encoder handles custom headers.

        See https://github.com/requests/toolbelt/issues/52
        """
        fields = [
            (b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
                                       b'filecontent',
                                       b'application/json'.decode('utf-8'),
                                       {'X-My-Header': 'my-value'}))
        ]
        m = MultipartEncoder(fields=fields)
        output = m.read().decode('utf-8')
        assert output.index('X-My-Header: my-value\r\n') > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:14,代码来源:test_multipart_encoder.py


示例13: prepare_request

    def prepare_request(self, userid='', destination='inbox', org_unit=''):
        fields = {
            'userid': userid or self.regular_user.getId(),
            'destination': destination,
            'file': ('mydocument.txt', 'my text', 'text/plain'),
        }
        if org_unit:
            fields['org_unit'] = org_unit

        encoder = MultipartEncoder(fields=fields)
        return encoder.to_string(), {
            'Content-Type': encoder.content_type,
            'Accept': 'application/json',
        }
开发者ID:4teamwork,项目名称:opengever.core,代码行数:14,代码来源:test_scanin.py


示例14: checkin_document

    def checkin_document(self, browser, tokens, payload, document, comment=None):  # noqa
        with self.as_officeconnector(browser):
            headers = {
                'Authorization': ' '.join((
                    'Bearer',
                    tokens.get('raw_token'),
                    )),
            }

            if comment:
                encoder = MultipartEncoder({
                    'form.widgets.comment': comment,
                    'form.buttons.button_checkin': 'Checkin',
                    '_authenticator': payload.get('csrf-token'),
                    })

                headers['Content-Type'] = encoder.content_type

                browser.open(
                    document,
                    view=payload.get('checkin-with-comment'),
                    headers=headers,
                    method='POST',
                    data=encoder.to_string(),
                    )
            else:
                browser.open(
                    document,
                    headers=headers,
                    view=payload.get('checkin-without-comment'),
                    send_authenticator=True,
                    )

        self.assert_journal_entry(
            document,
            DOCUMENT_CHECKED_IN,
            u'Document checked in',
            )

        journal_comments = get_journal_entry(document).get('comments')
        if comment:
            self.assertTrue(journal_comments)
        else:
            self.assertFalse(journal_comments)

        self.assertEquals(200, browser.status_code)
开发者ID:lukasgraf,项目名称:opengever.core,代码行数:46,代码来源:testing.py


示例15: test_streams_its_data

    def test_streams_its_data(self):
        large_file = LargeFileMock()
        parts = {'some field': 'value',
                 'some file': large_file,
                 }
        encoder = MultipartEncoder(parts)
        total_size = encoder.len
        read_size = 1024 * 1024 * 128
        already_read = 0
        while True:
            read = encoder.read(read_size)
            already_read += len(read)
            if not read:
                break

        assert encoder._buffer.tell() <= read_size
        assert already_read == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:17,代码来源:test_multipart_encoder.py


示例16: test_regression_2

    def test_regression_2(self):
        """Ensure issue #31 doesn't ever happen again."""
        fields = {
            "test": "t" * 8100
        }

        m = MultipartEncoder(fields=fields)
        total_size = m.len

        blocksize = 8192
        read_so_far = 0

        while True:
            data = m.read(blocksize)
            if not data:
                break
            read_so_far += len(data)

        assert read_so_far == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:19,代码来源:test_multipart_encoder.py


示例17: setUp

 def setUp(self):
     self.sample_1 = (
         ('field 1', 'value 1'),
         ('field 2', 'value 2'),
         ('field 3', 'value 3'),
         ('field 4', 'value 4'),
     )
     self.boundary = 'test boundary'
     self.encoded_1 = MultipartEncoder(self.sample_1, self.boundary)
     self.decoded_1 = MultipartDecoder(
         self.encoded_1.to_string(),
         self.encoded_1.content_type
     )
开发者ID:DigiThinkIT,项目名称:requests-toolbelt,代码行数:13,代码来源:test_multipart_decoder.py


示例18: test_regresion_1

    def test_regresion_1(self):
        """Ensure issue #31 doesn't ever happen again."""
        fields = {
            "test": "t" * 100
        }

        for x in range(30):
            fields['f%d' % x] = (
                'test', open('tests/test_multipart_encoder.py', 'rb')
                )

        m = MultipartEncoder(fields=fields)
        total_size = m.len

        blocksize = 8192
        read_so_far = 0

        while True:
            data = m.read(blocksize)
            if not data:
                break
            read_so_far += len(data)

        assert read_so_far == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:24,代码来源:test_multipart_encoder.py


示例19: setUp

 def setUp(self):
     self.parts = [('field', 'value'), ('other_field', 'other_value')]
     self.boundary = 'this-is-a-boundary'
     self.instance = MultipartEncoder(self.parts, boundary=self.boundary)
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:4,代码来源:test_multipart_encoder.py


示例20: TestMultipartEncoder

class TestMultipartEncoder(unittest.TestCase):
    def setUp(self):
        self.parts = [('field', 'value'), ('other_field', 'other_value')]
        self.boundary = 'this-is-a-boundary'
        self.instance = MultipartEncoder(self.parts, boundary=self.boundary)

    def test_to_string(self):
        assert self.instance.to_string() == (
            '--this-is-a-boundary\r\n'
            'Content-Disposition: form-data; name="field"\r\n\r\n'
            'value\r\n'
            '--this-is-a-boundary\r\n'
            'Content-Disposition: form-data; name="other_field"\r\n\r\n'
            'other_value\r\n'
            '--this-is-a-boundary--\r\n'
        ).encode()

    def test_content_type(self):
        expected = 'multipart/form-data; boundary=this-is-a-boundary'
        assert self.instance.content_type == expected

    def test_encodes_data_the_same(self):
        encoded = filepost.encode_multipart_formdata(self.parts,
                                                     self.boundary)[0]
        assert encoded == self.instance.read()

    def test_streams_its_data(self):
        large_file = LargeFileMock()
        parts = {'some field': 'value',
                 'some file': large_file,
                 }
        encoder = MultipartEncoder(parts)
        total_size = encoder.len
        read_size = 1024 * 1024 * 128
        already_read = 0
        while True:
            read = encoder.read(read_size)
            already_read += len(read)
            if not read:
                break

        assert encoder._buffer.tell() <= read_size
        assert already_read == total_size

    def test_length_is_correct(self):
        encoded = filepost.encode_multipart_formdata(self.parts,
                                                     self.boundary)[0]
        assert len(encoded) == self.instance.len

    def test_encodes_with_readable_data(self):
        s = io.BytesIO(b'value')
        m = MultipartEncoder([('field', s)], boundary=self.boundary)
        assert m.read() == (
            '--this-is-a-boundary\r\n'
            'Content-Disposition: form-data; name="field"\r\n\r\n'
            'value\r\n'
            '--this-is-a-boundary--\r\n'
        ).encode()

    def test_reads_open_file_objects(self):
        with open('setup.py', 'rb') as fd:
            m = MultipartEncoder([('field', 'foo'), ('file', fd)])
            assert m.read() is not None

    def test_reads_open_file_objects_with_a_specified_filename(self):
        with open('setup.py', 'rb') as fd:
            m = MultipartEncoder(
                [('field', 'foo'), ('file', ('filename', fd, 'text/plain'))]
                )
            assert m.read() is not None

    def test_reads_open_file_objects_using_to_string(self):
        with open('setup.py', 'rb') as fd:
            m = MultipartEncoder([('field', 'foo'), ('file', fd)])
            assert m.to_string() is not None

    def test_handles_encoded_unicode_strings(self):
        m = MultipartEncoder([
            ('field',
             b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3')
        ])
        assert m.read() is not None

    def test_handles_uncode_strings(self):
        s = b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3'
        m = MultipartEncoder([
            ('field', s.decode('utf-8'))
        ])
        assert m.read() is not None

    def test_regresion_1(self):
        """Ensure issue #31 doesn't ever happen again."""
        fields = {
            "test": "t" * 100
        }

        for x in range(30):
            fields['f%d' % x] = (
                'test', open('tests/test_multipart_encoder.py', 'rb')
                )
#.........这里部分代码省略.........
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:101,代码来源:test_multipart_encoder.py



注:本文中的requests_toolbelt.multipart.encoder.MultipartEncoder类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python project.Project类代码示例发布时间:2022-05-26
下一篇:
Python requests_respectful.RespectfulRequester类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap