• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python moves.StringIO类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.StringIO的典型用法代码示例。如果您正苦于以下问题:Python StringIO类的具体用法?Python StringIO怎么用?Python StringIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StringIO类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rest_endpoint

    def test_rest_endpoint(self):
        # type: () -> None
        """
        Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
        and downloaded using a username and api_key
        """
        fp = StringIO("zulip!")
        fp.name = "zulip.txt"

        # Upload file via API
        auth_headers = self.api_auth(self.example_email("hamlet"))
        result = self.client_post('/api/v1/user_uploads', {'file': fp}, **auth_headers)
        self.assertIn("uri", result.json())
        uri = result.json()['uri']
        base = '/user_uploads/'
        self.assertEqual(base, uri[:len(base)])

        # Download file via API
        self.logout()
        response = self.client_get(uri, **auth_headers)
        data = b"".join(response.streaming_content)
        self.assertEqual(b"zulip!", data)

        # Files uploaded through the API should be accesible via the web client
        self.login(self.example_email("hamlet"))
        self.assert_url_serves_contents_of_file(uri, b"zulip!")
开发者ID:brockwhittaker,项目名称:zulip,代码行数:26,代码来源:test_upload.py


示例2: do_check_on

        def do_check_on(value, nd, var=None):
            """
            Checks `value` for NaNs / Infs. If detected, raises an exception
            and / or prints information about `nd`, `f`, and `is_input` to
            help the user determine the cause of the invalid values.

            Parameters
            ----------
            value : numpy.ndarray
                The value to be checked.
            nd : theano.gof.Apply
                The Apply node being executed.
            var : theano.gof.Variable
                Not used if nd is there. Otherwise, used to print the stack
                trace for inputs of the graph.

            """
            error = False
            sio = StringIO()
            if nan_is_error:
                if contains_nan(value, nd, var):
                    print('NaN detected', file=sio)
                    error = True
            if inf_is_error:
                if contains_inf(value, nd, var):
                    print('Inf detected', file=sio)
                    error = True
            if big_is_error:
                err = False
                if not _is_numeric_value(value, var):
                    err = False
                elif pygpu_available and isinstance(value, GpuArray):
                    err = (f_gpua_absmax(value.reshape(value.size)) > 1e10)
                else:
                    err = (np.abs(value).max() > 1e10)
                if err:
                    print('Big value detected', file=sio)
                    error = True
            if error:
                if nd:
                    print("NanGuardMode found an error in the "
                          "output of a node in this variable:", file=sio)
                    print(theano.printing.debugprint(nd, file='str'), file=sio)
                else:
                    print("NanGuardMode found an error in an input of the "
                          "graph.", file=sio)
                # Add the stack trace
                if nd:
                    var = nd.outputs[0]
                print(theano.gof.utils.get_variable_trace_string(var),
                      file=sio)
                msg = sio.getvalue()
                if config.NanGuardMode.action == 'raise':
                    raise AssertionError(msg)
                elif config.NanGuardMode.action == 'pdb':
                    print(msg)
                    import pdb
                    pdb.set_trace()
                elif config.NanGuardMode.action == 'warn':
                    logger.error(msg)
开发者ID:DEVESHTARASIA,项目名称:Theano,代码行数:60,代码来源:nanguardmode.py


示例3: test_pydotprint_cond_highlight

def test_pydotprint_cond_highlight():
    """
    This is a REALLY PARTIAL TEST.

    I did them to help debug stuff.
    """

    # Skip test if pydot is not available.
    if not theano.printing.pydot_imported:
        raise SkipTest('pydot not available')

    x = tensor.dvector()
    f = theano.function([x], x * 2)
    f([1, 2, 3, 4])

    s = StringIO()
    new_handler = logging.StreamHandler(s)
    new_handler.setLevel(logging.DEBUG)
    orig_handler = theano.logging_default_handler

    theano.theano_logger.removeHandler(orig_handler)
    theano.theano_logger.addHandler(new_handler)
    try:
        theano.printing.pydotprint(f, cond_highlight=True,
                                   print_output_file=False)
    finally:
        theano.theano_logger.addHandler(orig_handler)
        theano.theano_logger.removeHandler(new_handler)

    assert (s.getvalue() == 'pydotprint: cond_highlight is set but there'
            ' is no IfElse node in the graph\n')
开发者ID:ALISCIFP,项目名称:Segmentation,代码行数:31,代码来源:test_printing.py


示例4: test_rest_endpoint

    def test_rest_endpoint(self):
        # type: () -> None
        """
        Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
        and downloaded using a username and api_key
        """
        fp = StringIO("zulip!")
        fp.name = "zulip.txt"

        # Upload file via API
        auth_headers = self.api_auth('[email protected]')
        result = self.client_post('/api/v1/user_uploads', {'file': fp}, **auth_headers)
        json = ujson.loads(result.content)
        self.assertIn("uri", json)
        uri = json["uri"]
        base = '/user_uploads/'
        self.assertEquals(base, uri[:len(base)])

        # Download file via API
        self.client_post('/accounts/logout/')
        response = self.client_get(uri, **auth_headers)
        data = b"".join(response.streaming_content)
        self.assertEquals(b"zulip!", data)

        # Files uploaded through the API should be accesible via the web client
        self.login("[email protected]")
        response = self.client_get(uri)
        data = b"".join(response.streaming_content)
        self.assertEquals(b"zulip!", data)
开发者ID:tobby2002,项目名称:zulip,代码行数:29,代码来源:test_upload.py


示例5: main

def main(loops, level):
    board, solution = LEVELS[level]
    order = DESCENDING
    strategy = Done.FIRST_STRATEGY
    stream = StringIO()

    board = board.strip()
    expected = solution.rstrip()

    range_it = xrange(loops)
    t0 = perf.perf_counter()

    for _ in range_it:
        stream = StringIO()
        solve_file(board, strategy, order, stream)
        output = stream.getvalue()
        stream = None

    dt = perf.perf_counter() - t0

    output = '\n'.join(line.rstrip() for line in output.splitlines())
    if output != expected:
        raise AssertionError("got a wrong answer:\n%s\nexpected: %s"
                             % (output, expected))

    return dt
开发者ID:Yaspee,项目名称:performance,代码行数:26,代码来源:bm_hexiom.py


示例6: test_record_bad

def test_record_bad():
    """
    Tests that when we record a sequence of events, then
    do something different on playback, the Record class catches it.
    """

    # Record a sequence of events
    output = StringIO()

    recorder = Record(file_object=output, replay=False)

    num_lines = 10

    for i in xrange(num_lines):
        recorder.handle_line(str(i) + '\n')

    # Make sure that the playback functionality doesn't raise any errors
    # when we repeat some of them
    output_value = output.getvalue()
    output = StringIO(output_value)

    playback_checker = Record(file_object=output, replay=True)

    for i in xrange(num_lines // 2):
        playback_checker.handle_line(str(i) + '\n')

    # Make sure it raises an error when we deviate from the recorded sequence
    try:
        playback_checker.handle_line('0\n')
    except MismatchError:
        return
    raise AssertionError("Failed to detect mismatch between recorded sequence "
                         " and repetition of it.")
开发者ID:12190143,项目名称:Theano,代码行数:33,代码来源:test_record.py


示例7: test_stderr_to_StringIO

def test_stderr_to_StringIO():
    s = StringIO()

    with stderr_to(s):
        sys.stderr.write(u("hello"))

    assert s.getvalue() == 'hello'
开发者ID:msabramo,项目名称:redirectory,代码行数:7,代码来源:test_redirectory.py


示例8: UpdateAppsAndBackendsTest

class UpdateAppsAndBackendsTest(TestCase):

    def setUp(self):
        self.output = StringIO()
        # BASE_APPS are needed for the managment commands to load successfully
        self.BASE_APPS = [
            'rapidsms',
            'django.contrib.auth',
            'django.contrib.contenttypes',
        ]

    def test_no_apps_then_none_added(self):
        with self.settings(INSTALLED_APPS=self.BASE_APPS):
            call_command('update_apps', stdout=self.output)
        self.assertEqual(self.output.getvalue(), '')

    def test_adds_app(self):
        # Add an app that has a RapidSMS app
        APPS = self.BASE_APPS + ['rapidsms.contrib.handlers']
        with self.settings(INSTALLED_APPS=APPS):
            call_command('update_apps', stdout=self.output)
        self.assertEqual(self.output.getvalue(), 'Added persistent app rapidsms.contrib.handlers\n')

    def test_no_backends_then_none_added(self):
        with self.settings(INSTALLED_BACKENDS={}):
            call_command('update_backends', stdout=self.output)
        self.assertEqual(self.output.getvalue(), '')

    def test_adds_backend(self):
        INSTALLED_BACKENDS = {
            "message_tester": {"ENGINE": "rapidsms.backends.database.DatabaseBackend"},
        }
        with self.settings(INSTALLED_BACKENDS=INSTALLED_BACKENDS):
            call_command('update_backends', stdout=self.output)
        self.assertEqual(self.output.getvalue(), 'Added persistent backend message_tester\n')
开发者ID:0xD3ADB33F,项目名称:rapidsms,代码行数:35,代码来源:test_management.py


示例9: test_main_help

def test_main_help(monkeypatch):
    # Patch stdout
    fakestdout = StringIO()
    monkeypatch.setattr(sys, "stdout", fakestdout)

    pytest.raises(SystemExit, main.main, ['--help'])
    assert fakestdout.getvalue().lstrip().startswith("Usage: ")
开发者ID:duecredit,项目名称:duecredit,代码行数:7,代码来源:test_cmdline.py


示例10: _find_snippet_imports

def _find_snippet_imports(module_data, module_path, strip_comments):
    """
    Given the source of the module, convert it to a Jinja2 template to insert
    module code and return whether it's a new or old style module.
    """

    module_style = "old"
    if REPLACER in module_data:
        module_style = "new"
    elif REPLACER_WINDOWS in module_data:
        module_style = "new"
    elif "from ansible.module_utils." in module_data:
        module_style = "new"
    elif "WANT_JSON" in module_data:
        module_style = "non_native_want_json"

    output = StringIO()
    lines = module_data.split("\n")
    snippet_names = []

    for line in lines:

        if REPLACER in line:
            output.write(_slurp(os.path.join(_SNIPPET_PATH, "basic.py")))
            snippet_names.append("basic")
        if REPLACER_WINDOWS in line:
            ps_data = _slurp(os.path.join(_SNIPPET_PATH, "powershell.ps1"))
            output.write(ps_data)
            snippet_names.append("powershell")
        elif line.startswith("from ansible.module_utils."):
            tokens = line.split(".")
            import_error = False
            if len(tokens) != 3:
                import_error = True
            if " import *" not in line:
                import_error = True
            if import_error:
                raise AnsibleError(
                    "error importing module in %s, expecting format like 'from ansible.module_utils.basic import *'"
                    % module_path
                )
            snippet_name = tokens[2].split()[0]
            snippet_names.append(snippet_name)
            output.write(_slurp(os.path.join(_SNIPPET_PATH, snippet_name + ".py")))
        else:
            if strip_comments and line.startswith("#") or line == "":
                pass
            output.write(line)
            output.write("\n")

    if not module_path.endswith(".ps1"):
        # Unixy modules
        if len(snippet_names) > 0 and not "basic" in snippet_names:
            raise AnsibleError("missing required import in %s: from ansible.module_utils.basic import *" % module_path)
    else:
        # Windows modules
        if len(snippet_names) > 0 and not "powershell" in snippet_names:
            raise AnsibleError("missing required import in %s: # POWERSHELL_COMMON" % module_path)

    return (output.getvalue(), module_style)
开发者ID:thebeefcake,项目名称:masterless,代码行数:60,代码来源:module_common.py


示例11: run

    def run(self, ctx):
        argv = ctx.command_argv
        p = ctx.options_context.parser
        o, a =  p.parse_args(argv)
        if o.help:
            p.print_help()
            return

        pkg = ctx.pkg
        format = o.format

        archive_root = "%s-%s" % (pkg.name, pkg.version)
        if not o.output_file:
            archive_name = archive_basename(pkg) + _FORMATS[format]["ext"]
        else:
            output = op.basename(o.output_file)
            if output != o.output_file:
                raise bento.errors.BentoError("Invalid output file: should not contain any directory")
            archive_name = output

        s = StringIO()
        write_pkg_info(ctx.pkg, s)
        n = ctx.build_node.make_node("PKG_INFO")
        n.parent.mkdir()
        n.write(s.getvalue())
        ctx.register_source_node(n, "PKG_INFO")

        # XXX: find a better way to pass archive name from other commands (used
        # by distcheck ATM)
        self.archive_root, self.archive_node = create_archive(archive_name, archive_root, ctx._node_pkg,
                ctx.top_node, ctx.run_node, o.format, o.output_dir)
开发者ID:B-Rich,项目名称:Bento,代码行数:31,代码来源:sdist.py


示例12: log_mem_usage

def log_mem_usage(signum, frame, fname=None):
    global _count
    _count += 1
    gc.collect()
    if not fname:
        fname = filename + '_memory_%02d.log' % _count
    with open(fname, 'wb') as f:
        f.write('gc.garbage: %d\n\n' % len(gc.garbage))
        objgraph.show_most_common_types(limit=50, file=f)
        f.write('\n\n')
        buf = StringIO()
        objgraph.show_growth(limit=50, file=buf)
        buf = buf.getvalue()
        f.write(buf)
    if _count < 2:
        return
    for tn, l in enumerate(buf.splitlines()[:10]):
        l = l.strip()
        if not l:
            continue
        type_ = l.split()[0]
        objects = objgraph.by_type(type_)
        objects = random.sample(objects, min(50, len(objects)))
        objgraph.show_chain(
            objgraph.find_backref_chain(
                objects[0],
                objgraph.is_proper_module),
            filename=fname[:-4] + '_type_%02d_backref.png' % tn
        )
        objgraph.show_backrefs(
            objects,
            max_depth=5,
            extra_info=lambda x: hex(id(x)),
            filename=fname[:-4] + '_type_%02d_backrefs.png' % tn,
        )
开发者ID:RDFLib,项目名称:graph-pattern-learner,代码行数:35,代码来源:memory_usage.py


示例13: render

    def render(self):
        """This is the tricky part, whith the rendered_content create a CSV"""


        if not self._is_rendered:

            # File pointer needed to create the CSV in memory
            buffer = StringIO()
            writer = UnicodeWriter(buffer)

            for row in self.rows:
                writer.writerow([six.text_type(value) for value
                                 in row])

            # Get the value of the StringIO buffer and write it to the response.
            csv = buffer.getvalue()
            buffer.close()
            self.write(csv)

            # Sets the appropriate CSV headers.
            self['Content-Disposition'] = 'attachment; filename=%s' % (
                self.filename, )

            # The CSV has been generated
            self._is_rendered = True

            for post_callback in self._post_render_callbacks:
                post_callback(self)
        return self
开发者ID:placiana,项目名称:django-enhanced-cbv,代码行数:29,代码来源:response.py


示例14: create_hook_module

def create_hook_module(target):
    safe_name = SAFE_MODULE_NAME.sub("_", target, len(target))
    module_name = "bento_hook_%s" % safe_name
    main_file = os.path.abspath(target)
    module = imp.new_module(module_name)
    module.__file__ = main_file
    code = open(main_file).read()

    sys.path.insert(0, os.path.dirname(main_file))
    try:
        exec(compile(code, main_file, 'exec'), module.__dict__)
        sys.modules[module_name] = module
    except Exception:
        sys.path.pop(0)
        e = extract_exception()
        tb = sys.exc_info()[2]
        s = StringIO()
        traceback.print_tb(tb, file=s)
        msg = """\
Could not import hook file %r: caught exception %r
Original traceback (most recent call last)
%s\
""" % (main_file, e, s.getvalue())
        raise InvalidHook(msg)

    module.root_path = main_file
    return module
开发者ID:Web5design,项目名称:Bento,代码行数:27,代码来源:hooks.py


示例15: test_contradictory_date_entries_warn

 def test_contradictory_date_entries_warn(self):
     """4.8.5.3 Emit warning on contradictory date entries."""
     stream = StringIO(
         wrap_document_text(construct_document_from(**{
             "Author": {
                 "ForeName": "John",
                 "LastName": "Smith"
             },
             "DateCompleted": {
                 "Year": "2011",
                 "Month": "01",
                 "Day": "01"
             },
             "DateRevised": {
                 "Year": "2010",
                 "Month": "01",
                 "Day": "01"
             },
         }))
     )
     stderr = StringIO()
     self.patch(sys, "stderr", stderr)
     result = parsexml.parse_element_tree(
         parsexml.file_to_element_tree(stream)
     )
     stderr.seek(0)
     stderr_out = stderr.read()
     self.assertThat(result["pubDate"], Is(None))
     self.assertThat(result["reviseDate"], Is(None))
     self.assertThat(stderr_out,
                     Contains("is greater than"))
开发者ID:DanielMcFall,项目名称:pubmed-retraction-analysis,代码行数:31,代码来源:test_parsexml.py


示例16: test_grid_01

    def test_grid_01(self):
        nid = 1
        cp = 2
        cd = 0
        ps = ''
        seid = 0
        card_count = {'GRID': 1,}

        model = BDF()
        model.allocate(card_count)
        data1 = BDFCard(['GRID', nid, cp, 0., 0., 0., cd, ps, seid])

        nodes = model.grid
        nodes.add(data1)

        #print n1
        f = StringIO()
        nodes.write_bdf(f, size=8, write_header=False)
        nodes.write_bdf(f, size=16, write_header=False)
        nodes.write_bdf(f, size=16, is_double=True, write_header=False)

        # small field
        f = StringIO()
        nodes.write_bdf(f, size=8, write_header=False)
        msg = f.getvalue()
        card = 'GRID           1       2      0.      0.      0.\n'
        self.assertCardEqual(msg, card)

        # large field
        f = StringIO()
        nodes.write_bdf(f, size=16, write_header=False)
        card = ('GRID*                  1               2              0.              0.\n'
                '*                     0.\n')
        msg = f.getvalue()
        self.assertCardEqual(msg, card)
开发者ID:umvarma,项目名称:pynastran,代码行数:35,代码来源:test_nodes.py


示例17: show

        def show(self, lswitch=None):
            params = [lswitch] if lswitch else []
            stdout = StringIO()
            self.run("show", args=params, stdout=stdout)
            output = stdout.getvalue()

            return get_lswitch_info(output)
开发者ID:l8huang,项目名称:ovn-scale-test,代码行数:7,代码来源:ovsclients_impl.py


示例18: test_record_good

def test_record_good():
    """
    Tests that when we record a sequence of events, then
    repeat it exactly, the Record class:
        1) Records it correctly
        2) Does not raise any errors
    """

    # Record a sequence of events
    output = StringIO()

    recorder = Record(file_object=output, replay=False)

    num_lines = 10

    for i in xrange(num_lines):
        recorder.handle_line(str(i) + '\n')

    # Make sure they were recorded correctly
    output_value = output.getvalue()

    assert output_value == ''.join(str(i) + '\n' for i in xrange(num_lines))

    # Make sure that the playback functionality doesn't raise any errors
    # when we repeat them
    output = StringIO(output_value)

    playback_checker = Record(file_object=output, replay=True)

    for i in xrange(num_lines):
        playback_checker.handle_line(str(i) + '\n')
开发者ID:12190143,项目名称:Theano,代码行数:31,代码来源:test_record.py


示例19: outstanding_bookings

def outstanding_bookings():
    try:
        outstanding = []
        today = datetime.date.today()
        for b in Booking.objects.filter(is_canceled=False,departure__gte=today).exclude(booking_type__in=['1','3']):
            if not b.paid:
                outstanding.append(b)


        strIO = StringIO()
        fieldnames = ['Confirmation Number','Customer','Campground','Arrival','Departure','Outstanding Amount']
        writer = csv.writer(strIO)
        writer.writerow(fieldnames)
        for o in outstanding:
            fullname = '{} {}'.format(o.details.get('first_name'),o.details.get('last_name'))
            writer.writerow([o.confirmation_number,fullname,o.campground.name,o.arrival.strftime('%d/%m/%Y'),o.departure.strftime('%d/%m/%Y'),o.outstanding])
        strIO.flush()
        strIO.seek(0)
        _file = strIO

        dt = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
        recipients = []
        recipients = OutstandingBookingRecipient.objects.all()
        email = EmailMessage(
            'Unpaid Bookings Summary as at {}'.format(dt),
            'Unpaid Bookings as at {}'.format(dt),
            settings.DEFAULT_FROM_EMAIL,
            to=[r.email for r in recipients]if recipients else [settings.NOTIFICATION_EMAIL]
        )
        email.attach('OustandingBookings_{}.csv'.format(dt), _file.getvalue(), 'text/csv')
        email.send()
    except:
        raise
开发者ID:wilsonc86,项目名称:ledger,代码行数:33,代码来源:reports.py


示例20: test_file_upload_authed

    def test_file_upload_authed(self):
        # type: () -> None
        """
        A call to /json/upload_file should return a uri and actually create an
        entry in the database. This entry will be marked unclaimed till a message
        refers it.
        """
        self.login("[email protected]")
        fp = StringIO("zulip!")
        fp.name = "zulip.txt"

        result = self.client_post("/json/upload_file", {'file': fp})
        self.assert_json_success(result)
        json = ujson.loads(result.content)
        self.assertIn("uri", json)
        uri = json["uri"]
        base = '/user_uploads/'
        self.assertEquals(base, uri[:len(base)])

        # In the future, local file requests will follow the same style as S3
        # requests; they will be first authenthicated and redirected
        response = self.client_get(uri)
        data = b"".join(response.streaming_content)
        self.assertEquals(b"zulip!", data)

        # check if DB has attachment marked as unclaimed
        entry = Attachment.objects.get(file_name='zulip.txt')
        self.assertEquals(entry.is_claimed(), False)

        self.subscribe_to_stream("[email protected]", "Denmark")
        body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
        self.send_message("[email protected]", "Denmark", Recipient.STREAM, body, "test")
        self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
开发者ID:tobby2002,项目名称:zulip,代码行数:33,代码来源:test_upload.py



注:本文中的six.moves.StringIO类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python moves.cStringIO类代码示例发布时间:2022-05-27
下一篇:
Python moves.zip_longest函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap