本文整理汇总了Python中requests_toolbelt.multipart.encoder.MultipartEncoder类的典型用法代码示例。如果您正苦于以下问题:Python MultipartEncoder类的具体用法?Python MultipartEncoder怎么用?Python MultipartEncoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MultipartEncoder类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upload_document
def upload_document(self, browser, raw_token, payload, document, new_file):
with self.as_officeconnector(browser):
encoder = MultipartEncoder({
'form.widgets.file.action': 'replace',
'form.buttons.upload': 'oc-file-upload',
'form.widgets.file': (
basename(new_file.name),
new_file,
payload.get('content-type'),
),
'_authenticator': payload.get('csrf-token'),
})
headers = {
'Authorization': ' '.join(('Bearer', raw_token, )),
'Content-Type': encoder.content_type,
}
browser.open(
document,
view=payload.get('upload-form'),
method='POST',
headers=headers,
data=encoder.to_string(),
)
self.assertEquals(204, browser.status_code)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:26,代码来源:testing.py
示例2: check_read_file_with_chunks
def check_read_file_with_chunks(self, file_size, read_size):
#print "===== Testing with file_size=",file_size,"read_size=",read_size
boundary="deterministic-test-boundary"
a_file = LargeFileMock(file_size)
parts = {'some_field': 'this is the value...',
'some_file': a_file.read(),
}
expected_bytes = encode_multipart_formdata(parts, boundary)[0]
content_length = len(expected_bytes)
# Now read from our encoder :
a_file = LargeFileMock(file_size)
parts = {'some_field': 'this is the value...',
'some_file': a_file,
}
encoder = MultipartEncoder(parts, boundary=boundary)
raw_bytes_count = 0
while True:
data = encoder.read(read_size)
if not data:
break
#print "read",len(data),"bytes : ",repr(data)
assert data == expected_bytes[raw_bytes_count:raw_bytes_count+len(data)]
raw_bytes_count += len(data)
#if raw_bytes_count != content_length:
# print "Test failed with file_size=",file_size,"and read_size=",read_size
assert raw_bytes_count == content_length
开发者ID:netheosgithub,项目名称:requests-toolbelt,代码行数:27,代码来源:test_multipart_encoder.py
示例3: login
def login(username, password):
url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
headers = getheaders()
checkcapthca(s, headers)
data = getdata(username, password)
encoder = MultipartEncoder(data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
headers['Content-Type'] = encoder.content_type
z2 = s.post(url, headers=headers, data=encoder.to_string())
print(z2.json())
开发者ID:chennanxu,项目名称:crawler,代码行数:9,代码来源:zhihulogin.py
示例4: test_encodes_with_readable_data
def test_encodes_with_readable_data(self):
s = io.BytesIO(b'value')
m = MultipartEncoder([('field', s)], boundary=self.boundary)
assert m.read() == (
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="field"\r\n\r\n'
'value\r\n'
'--this-is-a-boundary--\r\n'
).encode()
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py
示例5: test_reads_file_from_url_wrapper
def test_reads_file_from_url_wrapper(self):
s = requests.Session()
recorder = get_betamax(s)
url = ('https://stxnext.com/static/img/logo.830ebe551641.svg')
with recorder.use_cassette(
'file_for_download'):
m = MultipartEncoder(
[('field', 'foo'), ('file', FileFromURLWrapper(url))])
assert m.read() is not None
开发者ID:davidfischer,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py
示例6: test_handles_empty_unicode_values
def test_handles_empty_unicode_values(self):
"""Verify that the Encoder can handle empty unicode strings.
See https://github.com/requests/toolbelt/issues/46 for
more context.
"""
fields = [(b'test'.decode('utf-8'), b''.decode('utf-8'))]
m = MultipartEncoder(fields=fields)
assert len(m.read()) > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:9,代码来源:test_multipart_encoder.py
示例7: do_callback_request
def do_callback_request(self, browser, fields):
meeting = self.meeting
encoder = MultipartEncoder(fields=fields)
data = encoder.to_string()
headers = {'Content-Type': encoder.content_type}
with self.logout():
browser.open(
meeting, view='receive_meeting_zip_pdf',
method='POST', data=data, headers=headers)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:10,代码来源:test_meeting_zipexport_callback.py
示例8: login
def login(username, password):
url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
headers = getheaders()
data = getdata(username, password)
checkcapthca(headers)
# multipart_encoder = MultipartEncoder(fieles=data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
# todo:boundary后面的几位数可以随机,现在是固定的
encoder = MultipartEncoder(data, boundary='----WebKitFormBoundarycGPN1xiTi2hCSKKZ')
headers['Content-Type'] = encoder.content_type
z2 = s.post(url, headers=headers, data=encoder.to_string(), )
print(z2.json())
print('123')
开发者ID:q7695650,项目名称:pachong,代码行数:12,代码来源:zhihulogin.py
示例9: test_upload
def test_upload(self):
self.login()
manager = PluginRegistry.getInstance("UploadManager")
fpath = os.path.join(os.path.dirname(__file__), 'create_user.zip')
with open(fpath, "rb") as f:
m = MultipartEncoder(
fields={'file': ('create_user.zip', f, 'text/plain')}
)
data = m.to_string()
# try to use unregistered path
uuid, path = manager.registerUploadPath("admin", self.session_id, "workflow")
response = self.fetch("/uploads/unknown_path", method="POST", body=data, headers={
'Content-Type': m.content_type
})
assert response.code == 404
assert manager.unregisterUploadPath(uuid) is True
# try to use path from another user
uuid, path = manager.registerUploadPath("other_user", self.session_id, "workflow")
response = self.fetch(path, method="POST", body=data, headers={
'Content-Type': m.content_type
})
assert response.code == 403
assert manager.unregisterUploadPath(uuid) is True
# try to use path from another session
uuid, path = manager.registerUploadPath("admin", "other session id", "workflow")
response = self.fetch(path, method="POST", body=data, headers={
'Content-Type': m.content_type
})
assert response.code == 403
assert manager.unregisterUploadPath(uuid) is True
# try to use path for unhandled type
uuid, path = manager.registerUploadPath("admin", self.session_id, "unknown-type")
response = self.fetch(path, method="POST", body=data, headers={
'Content-Type': m.content_type
})
assert response.code == 501
assert manager.unregisterUploadPath(uuid) is True
# finally a working example
uuid, path = manager.registerUploadPath("admin", self.session_id, "workflow")
response = self.fetch(path, method="POST", body=data, headers={
'Content-Type': m.content_type,
'X-File-Name': 'create_user.zip'
})
assert response.code == 200
# path should have been removed by successfully unsigning it
assert manager.unregisterUploadPath(uuid) is False
开发者ID:gonicus,项目名称:gosa,代码行数:52,代码来源:test_main.py
示例10: test_accepts_custom_content_type
def test_accepts_custom_content_type(self):
"""Verify that the Encoder handles custom content-types.
See https://github.com/requests/toolbelt/issues/52
"""
fields = [
(b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
b'filecontent',
b'application/json'.decode('utf-8')))
]
m = MultipartEncoder(fields=fields)
output = m.read().decode('utf-8')
assert output.index('Content-Type: application/json\r\n') > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:13,代码来源:test_multipart_encoder.py
示例11: test_streams_its_data
def test_streams_its_data(self):
large_file = LargeFileMock()
parts = {'some field': 'value',
'some file': large_file,
}
encoder = MultipartEncoder(parts)
read_size = 1024 * 1024 * 128
while True:
read = encoder.read(read_size)
if not read:
break
assert encoder._buffer.tell() <= read_size
开发者ID:Zearin,项目名称:python-requests-toolbelt,代码行数:13,代码来源:test_multipart_encoder.py
示例12: test_accepts_custom_headers
def test_accepts_custom_headers(self):
"""Verify that the Encoder handles custom headers.
See https://github.com/requests/toolbelt/issues/52
"""
fields = [
(b'test'.decode('utf-8'), (b'filename'.decode('utf-8'),
b'filecontent',
b'application/json'.decode('utf-8'),
{'X-My-Header': 'my-value'}))
]
m = MultipartEncoder(fields=fields)
output = m.read().decode('utf-8')
assert output.index('X-My-Header: my-value\r\n') > 0
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:14,代码来源:test_multipart_encoder.py
示例13: prepare_request
def prepare_request(self, userid='', destination='inbox', org_unit=''):
fields = {
'userid': userid or self.regular_user.getId(),
'destination': destination,
'file': ('mydocument.txt', 'my text', 'text/plain'),
}
if org_unit:
fields['org_unit'] = org_unit
encoder = MultipartEncoder(fields=fields)
return encoder.to_string(), {
'Content-Type': encoder.content_type,
'Accept': 'application/json',
}
开发者ID:4teamwork,项目名称:opengever.core,代码行数:14,代码来源:test_scanin.py
示例14: checkin_document
def checkin_document(self, browser, tokens, payload, document, comment=None): # noqa
with self.as_officeconnector(browser):
headers = {
'Authorization': ' '.join((
'Bearer',
tokens.get('raw_token'),
)),
}
if comment:
encoder = MultipartEncoder({
'form.widgets.comment': comment,
'form.buttons.button_checkin': 'Checkin',
'_authenticator': payload.get('csrf-token'),
})
headers['Content-Type'] = encoder.content_type
browser.open(
document,
view=payload.get('checkin-with-comment'),
headers=headers,
method='POST',
data=encoder.to_string(),
)
else:
browser.open(
document,
headers=headers,
view=payload.get('checkin-without-comment'),
send_authenticator=True,
)
self.assert_journal_entry(
document,
DOCUMENT_CHECKED_IN,
u'Document checked in',
)
journal_comments = get_journal_entry(document).get('comments')
if comment:
self.assertTrue(journal_comments)
else:
self.assertFalse(journal_comments)
self.assertEquals(200, browser.status_code)
开发者ID:lukasgraf,项目名称:opengever.core,代码行数:46,代码来源:testing.py
示例15: test_streams_its_data
def test_streams_its_data(self):
large_file = LargeFileMock()
parts = {'some field': 'value',
'some file': large_file,
}
encoder = MultipartEncoder(parts)
total_size = encoder.len
read_size = 1024 * 1024 * 128
already_read = 0
while True:
read = encoder.read(read_size)
already_read += len(read)
if not read:
break
assert encoder._buffer.tell() <= read_size
assert already_read == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:17,代码来源:test_multipart_encoder.py
示例16: test_regression_2
def test_regression_2(self):
"""Ensure issue #31 doesn't ever happen again."""
fields = {
"test": "t" * 8100
}
m = MultipartEncoder(fields=fields)
total_size = m.len
blocksize = 8192
read_so_far = 0
while True:
data = m.read(blocksize)
if not data:
break
read_so_far += len(data)
assert read_so_far == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:19,代码来源:test_multipart_encoder.py
示例17: setUp
def setUp(self):
self.sample_1 = (
('field 1', 'value 1'),
('field 2', 'value 2'),
('field 3', 'value 3'),
('field 4', 'value 4'),
)
self.boundary = 'test boundary'
self.encoded_1 = MultipartEncoder(self.sample_1, self.boundary)
self.decoded_1 = MultipartDecoder(
self.encoded_1.to_string(),
self.encoded_1.content_type
)
开发者ID:DigiThinkIT,项目名称:requests-toolbelt,代码行数:13,代码来源:test_multipart_decoder.py
示例18: test_regresion_1
def test_regresion_1(self):
"""Ensure issue #31 doesn't ever happen again."""
fields = {
"test": "t" * 100
}
for x in range(30):
fields['f%d' % x] = (
'test', open('tests/test_multipart_encoder.py', 'rb')
)
m = MultipartEncoder(fields=fields)
total_size = m.len
blocksize = 8192
read_so_far = 0
while True:
data = m.read(blocksize)
if not data:
break
read_so_far += len(data)
assert read_so_far == total_size
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:24,代码来源:test_multipart_encoder.py
示例19: setUp
def setUp(self):
self.parts = [('field', 'value'), ('other_field', 'other_value')]
self.boundary = 'this-is-a-boundary'
self.instance = MultipartEncoder(self.parts, boundary=self.boundary)
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:4,代码来源:test_multipart_encoder.py
示例20: TestMultipartEncoder
class TestMultipartEncoder(unittest.TestCase):
def setUp(self):
self.parts = [('field', 'value'), ('other_field', 'other_value')]
self.boundary = 'this-is-a-boundary'
self.instance = MultipartEncoder(self.parts, boundary=self.boundary)
def test_to_string(self):
assert self.instance.to_string() == (
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="field"\r\n\r\n'
'value\r\n'
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="other_field"\r\n\r\n'
'other_value\r\n'
'--this-is-a-boundary--\r\n'
).encode()
def test_content_type(self):
expected = 'multipart/form-data; boundary=this-is-a-boundary'
assert self.instance.content_type == expected
def test_encodes_data_the_same(self):
encoded = filepost.encode_multipart_formdata(self.parts,
self.boundary)[0]
assert encoded == self.instance.read()
def test_streams_its_data(self):
large_file = LargeFileMock()
parts = {'some field': 'value',
'some file': large_file,
}
encoder = MultipartEncoder(parts)
total_size = encoder.len
read_size = 1024 * 1024 * 128
already_read = 0
while True:
read = encoder.read(read_size)
already_read += len(read)
if not read:
break
assert encoder._buffer.tell() <= read_size
assert already_read == total_size
def test_length_is_correct(self):
encoded = filepost.encode_multipart_formdata(self.parts,
self.boundary)[0]
assert len(encoded) == self.instance.len
def test_encodes_with_readable_data(self):
s = io.BytesIO(b'value')
m = MultipartEncoder([('field', s)], boundary=self.boundary)
assert m.read() == (
'--this-is-a-boundary\r\n'
'Content-Disposition: form-data; name="field"\r\n\r\n'
'value\r\n'
'--this-is-a-boundary--\r\n'
).encode()
def test_reads_open_file_objects(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder([('field', 'foo'), ('file', fd)])
assert m.read() is not None
def test_reads_open_file_objects_with_a_specified_filename(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder(
[('field', 'foo'), ('file', ('filename', fd, 'text/plain'))]
)
assert m.read() is not None
def test_reads_open_file_objects_using_to_string(self):
with open('setup.py', 'rb') as fd:
m = MultipartEncoder([('field', 'foo'), ('file', fd)])
assert m.to_string() is not None
def test_handles_encoded_unicode_strings(self):
m = MultipartEncoder([
('field',
b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3')
])
assert m.read() is not None
def test_handles_uncode_strings(self):
s = b'this is a unicode string: \xc3\xa9 \xc3\xa1 \xc7\xab \xc3\xb3'
m = MultipartEncoder([
('field', s.decode('utf-8'))
])
assert m.read() is not None
def test_regresion_1(self):
"""Ensure issue #31 doesn't ever happen again."""
fields = {
"test": "t" * 100
}
for x in range(30):
fields['f%d' % x] = (
'test', open('tests/test_multipart_encoder.py', 'rb')
)
#.........这里部分代码省略.........
开发者ID:snarfed,项目名称:requests-toolbelt,代码行数:101,代码来源:test_multipart_encoder.py
注:本文中的requests_toolbelt.multipart.encoder.MultipartEncoder类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论