本文整理汇总了Python中six.moves.StringIO类的典型用法代码示例。如果您正苦于以下问题:Python StringIO类的具体用法?Python StringIO怎么用?Python StringIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了StringIO类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_rest_endpoint
def test_rest_endpoint(self):
# type: () -> None
"""
Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
and downloaded using a username and api_key
"""
fp = StringIO("zulip!")
fp.name = "zulip.txt"
# Upload file via API
auth_headers = self.api_auth(self.example_email("hamlet"))
result = self.client_post('/api/v1/user_uploads', {'file': fp}, **auth_headers)
self.assertIn("uri", result.json())
uri = result.json()['uri']
base = '/user_uploads/'
self.assertEqual(base, uri[:len(base)])
# Download file via API
self.logout()
response = self.client_get(uri, **auth_headers)
data = b"".join(response.streaming_content)
self.assertEqual(b"zulip!", data)
# Files uploaded through the API should be accesible via the web client
self.login(self.example_email("hamlet"))
self.assert_url_serves_contents_of_file(uri, b"zulip!")
开发者ID:brockwhittaker,项目名称:zulip,代码行数:26,代码来源:test_upload.py
示例2: do_check_on
def do_check_on(value, nd, var=None):
"""
Checks `value` for NaNs / Infs. If detected, raises an exception
and / or prints information about `nd`, `f`, and `is_input` to
help the user determine the cause of the invalid values.
Parameters
----------
value : numpy.ndarray
The value to be checked.
nd : theano.gof.Apply
The Apply node being executed.
var : theano.gof.Variable
Not used if nd is there. Otherwise, used to print the stack
trace for inputs of the graph.
"""
error = False
sio = StringIO()
if nan_is_error:
if contains_nan(value, nd, var):
print('NaN detected', file=sio)
error = True
if inf_is_error:
if contains_inf(value, nd, var):
print('Inf detected', file=sio)
error = True
if big_is_error:
err = False
if not _is_numeric_value(value, var):
err = False
elif pygpu_available and isinstance(value, GpuArray):
err = (f_gpua_absmax(value.reshape(value.size)) > 1e10)
else:
err = (np.abs(value).max() > 1e10)
if err:
print('Big value detected', file=sio)
error = True
if error:
if nd:
print("NanGuardMode found an error in the "
"output of a node in this variable:", file=sio)
print(theano.printing.debugprint(nd, file='str'), file=sio)
else:
print("NanGuardMode found an error in an input of the "
"graph.", file=sio)
# Add the stack trace
if nd:
var = nd.outputs[0]
print(theano.gof.utils.get_variable_trace_string(var),
file=sio)
msg = sio.getvalue()
if config.NanGuardMode.action == 'raise':
raise AssertionError(msg)
elif config.NanGuardMode.action == 'pdb':
print(msg)
import pdb
pdb.set_trace()
elif config.NanGuardMode.action == 'warn':
logger.error(msg)
开发者ID:DEVESHTARASIA,项目名称:Theano,代码行数:60,代码来源:nanguardmode.py
示例3: test_pydotprint_cond_highlight
def test_pydotprint_cond_highlight():
"""
This is a REALLY PARTIAL TEST.
I did them to help debug stuff.
"""
# Skip test if pydot is not available.
if not theano.printing.pydot_imported:
raise SkipTest('pydot not available')
x = tensor.dvector()
f = theano.function([x], x * 2)
f([1, 2, 3, 4])
s = StringIO()
new_handler = logging.StreamHandler(s)
new_handler.setLevel(logging.DEBUG)
orig_handler = theano.logging_default_handler
theano.theano_logger.removeHandler(orig_handler)
theano.theano_logger.addHandler(new_handler)
try:
theano.printing.pydotprint(f, cond_highlight=True,
print_output_file=False)
finally:
theano.theano_logger.addHandler(orig_handler)
theano.theano_logger.removeHandler(new_handler)
assert (s.getvalue() == 'pydotprint: cond_highlight is set but there'
' is no IfElse node in the graph\n')
开发者ID:ALISCIFP,项目名称:Segmentation,代码行数:31,代码来源:test_printing.py
示例4: test_rest_endpoint
def test_rest_endpoint(self):
# type: () -> None
"""
Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
and downloaded using a username and api_key
"""
fp = StringIO("zulip!")
fp.name = "zulip.txt"
# Upload file via API
auth_headers = self.api_auth('[email protected]')
result = self.client_post('/api/v1/user_uploads', {'file': fp}, **auth_headers)
json = ujson.loads(result.content)
self.assertIn("uri", json)
uri = json["uri"]
base = '/user_uploads/'
self.assertEquals(base, uri[:len(base)])
# Download file via API
self.client_post('/accounts/logout/')
response = self.client_get(uri, **auth_headers)
data = b"".join(response.streaming_content)
self.assertEquals(b"zulip!", data)
# Files uploaded through the API should be accesible via the web client
self.login("[email protected]")
response = self.client_get(uri)
data = b"".join(response.streaming_content)
self.assertEquals(b"zulip!", data)
开发者ID:tobby2002,项目名称:zulip,代码行数:29,代码来源:test_upload.py
示例5: main
def main(loops, level):
board, solution = LEVELS[level]
order = DESCENDING
strategy = Done.FIRST_STRATEGY
stream = StringIO()
board = board.strip()
expected = solution.rstrip()
range_it = xrange(loops)
t0 = perf.perf_counter()
for _ in range_it:
stream = StringIO()
solve_file(board, strategy, order, stream)
output = stream.getvalue()
stream = None
dt = perf.perf_counter() - t0
output = '\n'.join(line.rstrip() for line in output.splitlines())
if output != expected:
raise AssertionError("got a wrong answer:\n%s\nexpected: %s"
% (output, expected))
return dt
开发者ID:Yaspee,项目名称:performance,代码行数:26,代码来源:bm_hexiom.py
示例6: test_record_bad
def test_record_bad():
"""
Tests that when we record a sequence of events, then
do something different on playback, the Record class catches it.
"""
# Record a sequence of events
output = StringIO()
recorder = Record(file_object=output, replay=False)
num_lines = 10
for i in xrange(num_lines):
recorder.handle_line(str(i) + '\n')
# Make sure that the playback functionality doesn't raise any errors
# when we repeat some of them
output_value = output.getvalue()
output = StringIO(output_value)
playback_checker = Record(file_object=output, replay=True)
for i in xrange(num_lines // 2):
playback_checker.handle_line(str(i) + '\n')
# Make sure it raises an error when we deviate from the recorded sequence
try:
playback_checker.handle_line('0\n')
except MismatchError:
return
raise AssertionError("Failed to detect mismatch between recorded sequence "
" and repetition of it.")
开发者ID:12190143,项目名称:Theano,代码行数:33,代码来源:test_record.py
示例7: test_stderr_to_StringIO
def test_stderr_to_StringIO():
s = StringIO()
with stderr_to(s):
sys.stderr.write(u("hello"))
assert s.getvalue() == 'hello'
开发者ID:msabramo,项目名称:redirectory,代码行数:7,代码来源:test_redirectory.py
示例8: UpdateAppsAndBackendsTest
class UpdateAppsAndBackendsTest(TestCase):
def setUp(self):
self.output = StringIO()
# BASE_APPS are needed for the managment commands to load successfully
self.BASE_APPS = [
'rapidsms',
'django.contrib.auth',
'django.contrib.contenttypes',
]
def test_no_apps_then_none_added(self):
with self.settings(INSTALLED_APPS=self.BASE_APPS):
call_command('update_apps', stdout=self.output)
self.assertEqual(self.output.getvalue(), '')
def test_adds_app(self):
# Add an app that has a RapidSMS app
APPS = self.BASE_APPS + ['rapidsms.contrib.handlers']
with self.settings(INSTALLED_APPS=APPS):
call_command('update_apps', stdout=self.output)
self.assertEqual(self.output.getvalue(), 'Added persistent app rapidsms.contrib.handlers\n')
def test_no_backends_then_none_added(self):
with self.settings(INSTALLED_BACKENDS={}):
call_command('update_backends', stdout=self.output)
self.assertEqual(self.output.getvalue(), '')
def test_adds_backend(self):
INSTALLED_BACKENDS = {
"message_tester": {"ENGINE": "rapidsms.backends.database.DatabaseBackend"},
}
with self.settings(INSTALLED_BACKENDS=INSTALLED_BACKENDS):
call_command('update_backends', stdout=self.output)
self.assertEqual(self.output.getvalue(), 'Added persistent backend message_tester\n')
开发者ID:0xD3ADB33F,项目名称:rapidsms,代码行数:35,代码来源:test_management.py
示例9: test_main_help
def test_main_help(monkeypatch):
# Patch stdout
fakestdout = StringIO()
monkeypatch.setattr(sys, "stdout", fakestdout)
pytest.raises(SystemExit, main.main, ['--help'])
assert fakestdout.getvalue().lstrip().startswith("Usage: ")
开发者ID:duecredit,项目名称:duecredit,代码行数:7,代码来源:test_cmdline.py
示例10: _find_snippet_imports
def _find_snippet_imports(module_data, module_path, strip_comments):
"""
Given the source of the module, convert it to a Jinja2 template to insert
module code and return whether it's a new or old style module.
"""
module_style = "old"
if REPLACER in module_data:
module_style = "new"
elif REPLACER_WINDOWS in module_data:
module_style = "new"
elif "from ansible.module_utils." in module_data:
module_style = "new"
elif "WANT_JSON" in module_data:
module_style = "non_native_want_json"
output = StringIO()
lines = module_data.split("\n")
snippet_names = []
for line in lines:
if REPLACER in line:
output.write(_slurp(os.path.join(_SNIPPET_PATH, "basic.py")))
snippet_names.append("basic")
if REPLACER_WINDOWS in line:
ps_data = _slurp(os.path.join(_SNIPPET_PATH, "powershell.ps1"))
output.write(ps_data)
snippet_names.append("powershell")
elif line.startswith("from ansible.module_utils."):
tokens = line.split(".")
import_error = False
if len(tokens) != 3:
import_error = True
if " import *" not in line:
import_error = True
if import_error:
raise AnsibleError(
"error importing module in %s, expecting format like 'from ansible.module_utils.basic import *'"
% module_path
)
snippet_name = tokens[2].split()[0]
snippet_names.append(snippet_name)
output.write(_slurp(os.path.join(_SNIPPET_PATH, snippet_name + ".py")))
else:
if strip_comments and line.startswith("#") or line == "":
pass
output.write(line)
output.write("\n")
if not module_path.endswith(".ps1"):
# Unixy modules
if len(snippet_names) > 0 and not "basic" in snippet_names:
raise AnsibleError("missing required import in %s: from ansible.module_utils.basic import *" % module_path)
else:
# Windows modules
if len(snippet_names) > 0 and not "powershell" in snippet_names:
raise AnsibleError("missing required import in %s: # POWERSHELL_COMMON" % module_path)
return (output.getvalue(), module_style)
开发者ID:thebeefcake,项目名称:masterless,代码行数:60,代码来源:module_common.py
示例11: run
def run(self, ctx):
argv = ctx.command_argv
p = ctx.options_context.parser
o, a = p.parse_args(argv)
if o.help:
p.print_help()
return
pkg = ctx.pkg
format = o.format
archive_root = "%s-%s" % (pkg.name, pkg.version)
if not o.output_file:
archive_name = archive_basename(pkg) + _FORMATS[format]["ext"]
else:
output = op.basename(o.output_file)
if output != o.output_file:
raise bento.errors.BentoError("Invalid output file: should not contain any directory")
archive_name = output
s = StringIO()
write_pkg_info(ctx.pkg, s)
n = ctx.build_node.make_node("PKG_INFO")
n.parent.mkdir()
n.write(s.getvalue())
ctx.register_source_node(n, "PKG_INFO")
# XXX: find a better way to pass archive name from other commands (used
# by distcheck ATM)
self.archive_root, self.archive_node = create_archive(archive_name, archive_root, ctx._node_pkg,
ctx.top_node, ctx.run_node, o.format, o.output_dir)
开发者ID:B-Rich,项目名称:Bento,代码行数:31,代码来源:sdist.py
示例12: log_mem_usage
def log_mem_usage(signum, frame, fname=None):
global _count
_count += 1
gc.collect()
if not fname:
fname = filename + '_memory_%02d.log' % _count
with open(fname, 'wb') as f:
f.write('gc.garbage: %d\n\n' % len(gc.garbage))
objgraph.show_most_common_types(limit=50, file=f)
f.write('\n\n')
buf = StringIO()
objgraph.show_growth(limit=50, file=buf)
buf = buf.getvalue()
f.write(buf)
if _count < 2:
return
for tn, l in enumerate(buf.splitlines()[:10]):
l = l.strip()
if not l:
continue
type_ = l.split()[0]
objects = objgraph.by_type(type_)
objects = random.sample(objects, min(50, len(objects)))
objgraph.show_chain(
objgraph.find_backref_chain(
objects[0],
objgraph.is_proper_module),
filename=fname[:-4] + '_type_%02d_backref.png' % tn
)
objgraph.show_backrefs(
objects,
max_depth=5,
extra_info=lambda x: hex(id(x)),
filename=fname[:-4] + '_type_%02d_backrefs.png' % tn,
)
开发者ID:RDFLib,项目名称:graph-pattern-learner,代码行数:35,代码来源:memory_usage.py
示例13: render
def render(self):
"""This is the tricky part, whith the rendered_content create a CSV"""
if not self._is_rendered:
# File pointer needed to create the CSV in memory
buffer = StringIO()
writer = UnicodeWriter(buffer)
for row in self.rows:
writer.writerow([six.text_type(value) for value
in row])
# Get the value of the StringIO buffer and write it to the response.
csv = buffer.getvalue()
buffer.close()
self.write(csv)
# Sets the appropriate CSV headers.
self['Content-Disposition'] = 'attachment; filename=%s' % (
self.filename, )
# The CSV has been generated
self._is_rendered = True
for post_callback in self._post_render_callbacks:
post_callback(self)
return self
开发者ID:placiana,项目名称:django-enhanced-cbv,代码行数:29,代码来源:response.py
示例14: create_hook_module
def create_hook_module(target):
safe_name = SAFE_MODULE_NAME.sub("_", target, len(target))
module_name = "bento_hook_%s" % safe_name
main_file = os.path.abspath(target)
module = imp.new_module(module_name)
module.__file__ = main_file
code = open(main_file).read()
sys.path.insert(0, os.path.dirname(main_file))
try:
exec(compile(code, main_file, 'exec'), module.__dict__)
sys.modules[module_name] = module
except Exception:
sys.path.pop(0)
e = extract_exception()
tb = sys.exc_info()[2]
s = StringIO()
traceback.print_tb(tb, file=s)
msg = """\
Could not import hook file %r: caught exception %r
Original traceback (most recent call last)
%s\
""" % (main_file, e, s.getvalue())
raise InvalidHook(msg)
module.root_path = main_file
return module
开发者ID:Web5design,项目名称:Bento,代码行数:27,代码来源:hooks.py
示例15: test_contradictory_date_entries_warn
def test_contradictory_date_entries_warn(self):
"""4.8.5.3 Emit warning on contradictory date entries."""
stream = StringIO(
wrap_document_text(construct_document_from(**{
"Author": {
"ForeName": "John",
"LastName": "Smith"
},
"DateCompleted": {
"Year": "2011",
"Month": "01",
"Day": "01"
},
"DateRevised": {
"Year": "2010",
"Month": "01",
"Day": "01"
},
}))
)
stderr = StringIO()
self.patch(sys, "stderr", stderr)
result = parsexml.parse_element_tree(
parsexml.file_to_element_tree(stream)
)
stderr.seek(0)
stderr_out = stderr.read()
self.assertThat(result["pubDate"], Is(None))
self.assertThat(result["reviseDate"], Is(None))
self.assertThat(stderr_out,
Contains("is greater than"))
开发者ID:DanielMcFall,项目名称:pubmed-retraction-analysis,代码行数:31,代码来源:test_parsexml.py
示例16: test_grid_01
def test_grid_01(self):
nid = 1
cp = 2
cd = 0
ps = ''
seid = 0
card_count = {'GRID': 1,}
model = BDF()
model.allocate(card_count)
data1 = BDFCard(['GRID', nid, cp, 0., 0., 0., cd, ps, seid])
nodes = model.grid
nodes.add(data1)
#print n1
f = StringIO()
nodes.write_bdf(f, size=8, write_header=False)
nodes.write_bdf(f, size=16, write_header=False)
nodes.write_bdf(f, size=16, is_double=True, write_header=False)
# small field
f = StringIO()
nodes.write_bdf(f, size=8, write_header=False)
msg = f.getvalue()
card = 'GRID 1 2 0. 0. 0.\n'
self.assertCardEqual(msg, card)
# large field
f = StringIO()
nodes.write_bdf(f, size=16, write_header=False)
card = ('GRID* 1 2 0. 0.\n'
'* 0.\n')
msg = f.getvalue()
self.assertCardEqual(msg, card)
开发者ID:umvarma,项目名称:pynastran,代码行数:35,代码来源:test_nodes.py
示例17: show
def show(self, lswitch=None):
params = [lswitch] if lswitch else []
stdout = StringIO()
self.run("show", args=params, stdout=stdout)
output = stdout.getvalue()
return get_lswitch_info(output)
开发者ID:l8huang,项目名称:ovn-scale-test,代码行数:7,代码来源:ovsclients_impl.py
示例18: test_record_good
def test_record_good():
"""
Tests that when we record a sequence of events, then
repeat it exactly, the Record class:
1) Records it correctly
2) Does not raise any errors
"""
# Record a sequence of events
output = StringIO()
recorder = Record(file_object=output, replay=False)
num_lines = 10
for i in xrange(num_lines):
recorder.handle_line(str(i) + '\n')
# Make sure they were recorded correctly
output_value = output.getvalue()
assert output_value == ''.join(str(i) + '\n' for i in xrange(num_lines))
# Make sure that the playback functionality doesn't raise any errors
# when we repeat them
output = StringIO(output_value)
playback_checker = Record(file_object=output, replay=True)
for i in xrange(num_lines):
playback_checker.handle_line(str(i) + '\n')
开发者ID:12190143,项目名称:Theano,代码行数:31,代码来源:test_record.py
示例19: outstanding_bookings
def outstanding_bookings():
try:
outstanding = []
today = datetime.date.today()
for b in Booking.objects.filter(is_canceled=False,departure__gte=today).exclude(booking_type__in=['1','3']):
if not b.paid:
outstanding.append(b)
strIO = StringIO()
fieldnames = ['Confirmation Number','Customer','Campground','Arrival','Departure','Outstanding Amount']
writer = csv.writer(strIO)
writer.writerow(fieldnames)
for o in outstanding:
fullname = '{} {}'.format(o.details.get('first_name'),o.details.get('last_name'))
writer.writerow([o.confirmation_number,fullname,o.campground.name,o.arrival.strftime('%d/%m/%Y'),o.departure.strftime('%d/%m/%Y'),o.outstanding])
strIO.flush()
strIO.seek(0)
_file = strIO
dt = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
recipients = []
recipients = OutstandingBookingRecipient.objects.all()
email = EmailMessage(
'Unpaid Bookings Summary as at {}'.format(dt),
'Unpaid Bookings as at {}'.format(dt),
settings.DEFAULT_FROM_EMAIL,
to=[r.email for r in recipients]if recipients else [settings.NOTIFICATION_EMAIL]
)
email.attach('OustandingBookings_{}.csv'.format(dt), _file.getvalue(), 'text/csv')
email.send()
except:
raise
开发者ID:wilsonc86,项目名称:ledger,代码行数:33,代码来源:reports.py
示例20: test_file_upload_authed
def test_file_upload_authed(self):
# type: () -> None
"""
A call to /json/upload_file should return a uri and actually create an
entry in the database. This entry will be marked unclaimed till a message
refers it.
"""
self.login("[email protected]")
fp = StringIO("zulip!")
fp.name = "zulip.txt"
result = self.client_post("/json/upload_file", {'file': fp})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertIn("uri", json)
uri = json["uri"]
base = '/user_uploads/'
self.assertEquals(base, uri[:len(base)])
# In the future, local file requests will follow the same style as S3
# requests; they will be first authenthicated and redirected
response = self.client_get(uri)
data = b"".join(response.streaming_content)
self.assertEquals(b"zulip!", data)
# check if DB has attachment marked as unclaimed
entry = Attachment.objects.get(file_name='zulip.txt')
self.assertEquals(entry.is_claimed(), False)
self.subscribe_to_stream("[email protected]", "Denmark")
body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
self.send_message("[email protected]", "Denmark", Recipient.STREAM, body, "test")
self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
开发者ID:tobby2002,项目名称:zulip,代码行数:33,代码来源:test_upload.py
注:本文中的six.moves.StringIO类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论