• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python tempfile.NamedTemporaryFile类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempfile.NamedTemporaryFile的典型用法代码示例。如果您正苦于以下问题:Python NamedTemporaryFile类的具体用法?Python NamedTemporaryFile怎么用?Python NamedTemporaryFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了NamedTemporaryFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: AtomicFileWriter

class AtomicFileWriter(object):
    def __init__(self, path, tmp_prefix=None, encoding='utf-8'):
        self.name = path
        output_dir, base = os.path.split(path)
        if tmp_prefix is None:
            tmp_prefix = base + '.'

        self.tmpf = NamedTemporaryFile(dir=output_dir, prefix=tmp_prefix,
                                       mode='w', encoding=encoding,
                                       delete=False)

    def __enter__(self):
        self.tmpf.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        tmp_name = self.tmpf.name
        result = self.tmpf.__exit__(exc_type, exc_value, exc_traceback)
        if result or exc_type is None:
            os.rename(tmp_name, self.name)
        else:
            os.unlink(tmp_name)
        return result

    def write(self, data):
        return self.tmpf.write(data)
开发者ID:simpkins,项目名称:avrpp,代码行数:26,代码来源:usb_config.py


示例2: apicalls

def apicalls(target, **kwargs):
    """
    """
    if not target:
        raise Exception("Invalid target for apicalls()")

    output_file = NamedTemporaryFile()
    kwargs.update({"output_file" : output_file})
    cmd = _dtrace_command_line(target, **kwargs)

    # Generate dtrace probes for analysis
    definitions = os.path.abspath(os.path.join(__file__, "../../core/data/signatures.yml"))
    probes_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "probes.d")
    generate_probes(definitions, probes_file, overwrite=True)

    # The dtrace script will take care of timeout itself, so we just launch
    # it asynchronously
    with open(os.devnull, "w") as null:
        _ = Popen(cmd, stdout=null, stderr=null, cwd=current_directory())

    with open('/Users/cloudmark/yield.txt', 'w+') as f:
        for entry in filelines(output_file):
            value = entry.strip()
            if "## apicalls.d done ##" in value:
                break
            if len(value) == 0:
             continue
            f.write(str(_parse_entry(value)))
            f.flush()
            import time
            time.sleep(1)
            yield _parse_entry(value)
    output_file.close()
    os.remove(probes_file)
开发者ID:thebiglai,项目名称:cuckoo-osx-analyzer,代码行数:34,代码来源:apicalls.py


示例3: run

	def run(self):
		active_view = self.window.active_view()
		text = "\n\n".join(getSelectedText(active_view)).strip()

		tf = NamedTemporaryFile(mode="w", delete=False)
		try:
			tf.write(text)
			tf.close()

			res = subprocess.check_output(["m4", tf.name],
			                              stderr=subprocess.STDOUT,
			                              cwd=os.path.dirname(os.path.abspath(active_view.file_name())))
			res = res.decode('utf-8').replace('\r', '').strip()

			panel_name = "m4expand.results"
			panel = self.window.create_output_panel(panel_name)
			self.window.run_command("show_panel", {"panel": "output." + panel_name})

			panel.set_read_only(False)
			panel.set_syntax_file(active_view.settings().get("syntax"))
			panel.run_command("append", {"characters": res})
			panel.set_read_only(True)
		except Exception as e:
			print("M4Expand - An error occurred: ", e)
		finally:
			os.unlink(tf.name)
开发者ID:alextarrell,项目名称:M4Expand,代码行数:26,代码来源:M4Expand.py


示例4: tmpfile

def tmpfile(stream, mode=None):
    """Context manager that writes a :class:`Stream` object to a named
    temporary file and yield it's filename. Cleanup deletes from the temporary
    file from disk.

    Args:
        stream (Stream): Stream object to write to disk as temporary file.
        mode (int, optional): File mode to set on temporary file.

    Returns:
        str: Temporoary file name
    """
    tmp = NamedTemporaryFile(delete=False)

    if mode is not None:
        oldmask = os.umask(0)

        try:
            os.chmod(tmp.name, mode)
        finally:
            os.umask(oldmask)

    for data in stream:
        tmp.write(to_bytes(data))

    tmp.close()

    yield tmp.name

    os.remove(tmp.name)
开发者ID:kanchanarp,项目名称:hashfs,代码行数:30,代码来源:hashfs.py


示例5: testLogfile

 def testLogfile(self):
     """Test logging into a logfile"""
     f = NamedTemporaryFile(delete=False)
     filename = f.name
     try:
         set_log_level("error")  # avoid using the console logger
         f.write(":-P\n")
         f.close()
         start_logfile(f.name, "devinfo")
         log = getLogger("prosoda.test.integration.test_logger")
         log.debug("Should not be in logfile! :-( ")
         log.info("Should be in logfile :-) ")
         log.devinfo("Should be in logfile :-) ")
         log.warning("Should really be in logfile :-D ")
         stop_logfile(f.name)
         contents = file(f.name).read()
         self.assertNotIn(":-(", contents)
         self.assertNotIn(":-P", contents)
         self.assertIn(":-)", contents)
         self.assertIn(":-D", contents)
         # Make sure no colour codes are leaked into the logfile
         self.assertNotIn("\033", contents)
     finally:
         set_log_level("debug")
         unlink(filename)
开发者ID:wolfgangmauerer,项目名称:prosoda,代码行数:25,代码来源:test_logger.py


示例6: __enter__

    def __enter__(self):
        # Ensure that we have not re-entered
        if self.temp_path != None or self.service != None:
            raise Exception('Cannot use multiple nested with blocks on same Youtube object!')

        flow = flow_from_clientsecrets(
            self.client_secrets_path,
            scope=YOUTUBE_UPLOAD_SCOPE,
            message=MISSING_CLIENT_SECRETS_MESSAGE)

        temp_file = NamedTemporaryFile(delete=False)
        self.temp_path = temp_file.name
        temp_file.close()

        storage = Storage(self.temp_path)
        credentials = storage.get()

        if credentials is None or credentials.invalid:
            credentials = run_flow(
                flow, storage, argparser.parse_args(list())
            )

        self.service = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
            http=credentials.authorize(httplib2.Http()))

        return self
开发者ID:northWind87,项目名称:workout_video_uploader,代码行数:26,代码来源:youtube.py


示例7: run_solver

    def run_solver(self, conflicts, election, deletion_handler, outfile=None):
        if not conflicts:
            return [], 0

        self.deletion_handler = deletion_handler

        instance = self.generate_instance(conflicts, election)

        f = NamedTemporaryFile(delete=False)
        f.write(instance.encode(code))
        f.close()

        process = Popen([self.cmd, f.name], stdout=PIPE)
        out, err = process.communicate()

        conflict_variables, optimum = self.parse_instance(out)

        if outfile:
            candidates = election[0]
            votes = election[1]
            votecounts = election[2]

            votemap = self.delete_votes(votes, votecounts, conflict_variables)
            votesum = sum(votemap.values())

            write_map(candidates, votesum, votemap, open(outfile, "w"))

        remove(f.name)
        return conflict_variables, optimum
开发者ID:markusscherer,项目名称:pai-project,代码行数:29,代码来源:check_configurations.py


示例8: test_toy_corpus

def test_toy_corpus():

    keats = ('She dwells with Beauty - Beauty that must die;\n\n'
             'And Joy, whose hand is ever at his lips\n\n' 
             'Bidding adieu; and aching Pleasure nigh,\n\n'
             'Turning to poison while the bee-mouth sips:\n\n'
             'Ay, in the very temple of Delight\n\n'
             'Veil\'d Melancholy has her sovran shrine,\n\n'
             'Though seen of none save him whose strenuous tongue\n\n'
             'Can burst Joy\'s grape against his palate fine;\n\n'
             'His soul shall taste the sadness of her might,\n\n'
             'And be among her cloudy trophies hung.')

    assert toy_corpus(keats)
    assert toy_corpus(keats, nltk_stop=True)
    assert toy_corpus(keats, stop_freq=1)
    assert toy_corpus(keats, add_stop=['and', 'with'])
    assert toy_corpus(keats, nltk_stop=True,
                      stop_freq=1, add_stop=['ay'])

    import os
    from tempfile import NamedTemporaryFile as NFT

    tmp = NFT(delete=False)
    tmp.write(keats)
    tmp.close()

    c = toy_corpus(tmp.name, is_filename=True, 
                   nltk_stop=True, add_stop=['ay'])
    
    assert c
    os.remove(tmp.name)

    return c
开发者ID:argdundee,项目名称:DbyD,代码行数:34,代码来源:corpusbuilders.py


示例9: command_update

def command_update(args):
    def write_to(out):
        config.output(out)
    library = KeyLibrary(args.key_directory)
    with open(args.config_file) as fd:
        config = SedgeEngine(library, fd, not args.no_verify, url=args.config_file)
    if args.output_file == '-':
        write_to(ConfigOutput(sys.stdout))
        return
    if not check_or_confirm_overwrite(args.output_file):
        print("Aborting.", file=sys.stderr)
        sys.exit(1)

    tmpf = NamedTemporaryFile(mode='w', dir=os.path.dirname(args.output_file), delete=False)
    try:
        tmpf.file.write('''\
# :sedge:
#
# this configuration generated from `sedge' file:
# %s
#
# do not edit this file manually, edit the source file and re-run `sedge'
#

''' % (args.config_file))
        write_to(ConfigOutput(tmpf.file))
        tmpf.close()
        if args.verbose:
            diff_config_changes(args.output_file, tmpf.name)
        os.rename(tmpf.name, args.output_file)
    except:
        os.unlink(tmpf.name)
        raise
开发者ID:dmeulen,项目名称:sedge,代码行数:33,代码来源:cli.py


示例10: execute

    def execute(self, context):
        hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
        logging.info("Extracting data from Hive")
        logging.info(self.sql)

        if self.bulk_load:
            tmpfile = NamedTemporaryFile()
            hive.to_csv(self.sql, tmpfile.name, delimiter='\t',
                lineterminator='\n', output_header=False)
        else:
            results = hive.get_records(self.sql)

        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        if self.mysql_preoperator:
            logging.info("Running MySQL preoperator")
            mysql.run(self.mysql_preoperator)

        logging.info("Inserting rows into MySQL")

        if self.bulk_load:
            mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
            tmpfile.close()
        else:
            mysql.insert_rows(table=self.mysql_table, rows=results)

        if self.mysql_postoperator:
            logging.info("Running MySQL postoperator")
            mysql.run(self.mysql_postoperator)

        logging.info("Done.")
开发者ID:AndreiDev,项目名称:incubator-airflow,代码行数:30,代码来源:hive_to_mysql.py


示例11: write_temp_file

def write_temp_file(data):
    # create a temp file for use as a config file. This should get cleaned
    # up magically at the end of the run.
    fid = NamedTemporaryFile(mode='w+b', suffix='.tmp')
    fid.write(data)
    fid.seek(0)
    return fid
开发者ID:belonesox,项目名称:python-rhsm,代码行数:7,代码来源:config-tests.py


示例12: _build_and_catch_errors

    def _build_and_catch_errors(self, build_func, options_bytes, source=None):
        try:
            return build_func()
        except _cl.RuntimeError as e:
            msg = e.what
            if options_bytes:
                msg = msg + "\n(options: %s)" % options_bytes.decode("utf-8")

            if source is not None:
                from tempfile import NamedTemporaryFile
                srcfile = NamedTemporaryFile(mode="wt", delete=False, suffix=".cl")
                try:
                    srcfile.write(source)
                finally:
                    srcfile.close()

                msg = msg + "\n(source saved as %s)" % srcfile.name

            code = e.code
            routine = e.routine

            err = _cl.RuntimeError(
                    _cl.Error._ErrorRecord(
                        msg=msg,
                        code=code,
                        routine=routine))

        # Python 3.2 outputs the whole list of currently active exceptions
        # This serves to remove one (redundant) level from that nesting.
        raise err
开发者ID:hrfuller,项目名称:pyopencl,代码行数:30,代码来源:__init__.py


示例13: roundtrip

 def roundtrip(self, dtype, x, suffix):
     f = NamedTemporaryFile(suffix='.' + suffix)
     fname = f.name
     f.close()
     sio.imsave(fname, x)
     y = sio.imread(fname)
     assert_array_equal(y, x)
开发者ID:AtonLerin,项目名称:maya_python_packages,代码行数:7,代码来源:test_freeimage.py


示例14: _write_local_schema_file

    def _write_local_schema_file(self, cursor):
        """
        Takes a cursor, and writes the BigQuery schema for the results to a
        local file system.

        :return: A dictionary where key is a filename to be used as an object
            name in GCS, and values are file handles to local files that
            contains the BigQuery schema fields in .json format.
        """
        schema = []
        for field in cursor.description:
            # See PEP 249 for details about the description tuple.
            field_name = field[0]
            field_type = self.type_map(field[1])
            # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
            # for required fields because some MySQL timestamps can't be
            # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
            field_mode = 'NULLABLE' if field[6] or field_type == 'TIMESTAMP' else 'REQUIRED'
            schema.append({
                'name': field_name,
                'type': field_type,
                'mode': field_mode,
            })

        self.log.info('Using schema for %s: %s', self.schema_filename, schema)
        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
        s = json.dumps(schema, tmp_schema_file_handle)
        if PY3:
            s = s.encode('utf-8')
        tmp_schema_file_handle.write(s)
        return {self.schema_filename: tmp_schema_file_handle}
开发者ID:7digital,项目名称:incubator-airflow,代码行数:31,代码来源:mysql_to_gcs.py


示例15: run

    def run(self, uid, aid, publish=True):
        aid = int(aid)
        audiobook = Audiobook.objects.get(id=aid)
        self.set_status(aid, status.ENCODING)

        user = User.objects.get(id=uid)

        try:
            os.makedirs(BUILD_PATH)
        except OSError as e:
            if e.errno == errno.EEXIST:
                pass
            else:
                raise

        out_file = NamedTemporaryFile(delete=False, prefix='%d-' % aid, suffix='.%s' % self.ext, dir=BUILD_PATH)
        out_file.close()
        self.encode(audiobook.source_file.path, out_file.name)
        self.set_status(aid, status.TAGGING)
        self.set_tags(audiobook, out_file.name)
        self.set_status(aid, status.SENDING)

        if publish:
            self.put(user, audiobook, out_file.name)
            self.published(aid)
        else:
            self.set_status(aid, None)

        self.save(audiobook, out_file.name)
开发者ID:fnp,项目名称:audio,代码行数:29,代码来源:tasks.py


示例16: reg_code

def reg_code():
    img, code =generate_code_image((80,30),5)
    session["code"] = code
    tp = NamedTemporaryFile()
    img.save(tp.name,format="png")
    tp.seek(0)
    return send_file(tp.name,mimetype='image/png')
开发者ID:ipconfiger,项目名称:OpenStore,代码行数:7,代码来源:index.py


示例17: _write_local_schema_file

    def _write_local_schema_file(self, cursor):
        """
        Takes a cursor, and writes the BigQuery schema for the results to a
        local file system.

        :return: A dictionary where key is a filename to be used as an object
            name in GCS, and values are file handles to local files that
            contains the BigQuery schema fields in .json format.
        """
        schema = []
        for field in cursor.description:
            # See PEP 249 for details about the description tuple.
            field_name = field[0]
            field_type = self.type_map(field[1])
            field_mode = 'REPEATED' if field[1] in (1009, 1005, 1007,
                                                    1016) else 'NULLABLE'
            schema.append({
                'name': field_name,
                'type': field_type,
                'mode': field_mode,
            })

        self.log.info('Using schema for %s: %s', self.schema_filename, schema)
        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
        s = json.dumps(schema, sort_keys=True)
        if PY3:
            s = s.encode('utf-8')
        tmp_schema_file_handle.write(s)
        return {self.schema_filename: tmp_schema_file_handle}
开发者ID:astronomerio,项目名称:incubator-airflow,代码行数:29,代码来源:postgres_to_gcs_operator.py


示例18: test_seq_pipeline_run

    def test_seq_pipeline_run():
        'It tests that the pipeline runs ok'
        pipeline = 'sanger_with_qual'

        fhand_adaptors = NamedTemporaryFile()
        fhand_adaptors.write(ADAPTORS)
        fhand_adaptors.flush()
        arabidopsis_genes = 'arabidopsis_genes+'
        univec = os.path.join(TEST_DATA_DIR, 'blast', arabidopsis_genes)
        configuration = {'remove_vectors_blastdb': {'vectors': univec},
                         'remove_adaptors': {'adaptors': fhand_adaptors.name}}

        in_fhands = {}
        in_fhands['in_seq'] = open(os.path.join(TEST_DATA_DIR, 'seq.fasta'),
                                   'r')
        in_fhands['in_qual'] = open(os.path.join(TEST_DATA_DIR, 'qual.fasta'),
                                    'r')

        out_seq_fhand = NamedTemporaryFile()
        out_qual_fhand = NamedTemporaryFile()
        writer = SequenceWriter(out_seq_fhand, qual_fhand=out_qual_fhand,
                                file_format='fasta')
        seq_pipeline_runner(pipeline, configuration, in_fhands,
                            writers={'seq': writer})
        result_seq = open(out_seq_fhand.name).read()
        assert result_seq.count('>') == 6

        #are we keeping the description?
        assert 'mdust' in result_seq
开发者ID:BioinformaticsArchive,项目名称:franklin,代码行数:29,代码来源:pipeline_test.py


示例19: test_cmyk

def test_cmyk():
    ref = imread(os.path.join(data_dir, 'color.png'))

    img = Image.open(os.path.join(data_dir, 'color.png'))
    img = img.convert('CMYK')

    f = NamedTemporaryFile(suffix='.jpg')
    fname = f.name
    f.close()
    img.save(fname)
    try:
        img.close()
    except AttributeError:  # `close` not available on PIL
        pass

    new = imread(fname)

    ref_lab = rgb2lab(ref)
    new_lab = rgb2lab(new)

    for i in range(3):
        newi = np.ascontiguousarray(new_lab[:, :, i])
        refi = np.ascontiguousarray(ref_lab[:, :, i])
        sim = ssim(refi, newi, dynamic_range=refi.max() - refi.min())
        assert sim > 0.99
开发者ID:JDWarner,项目名称:scikit-image,代码行数:25,代码来源:test_pil.py


示例20: test_seq_pipeline_parallel_run_with_fasta_qual

    def test_seq_pipeline_parallel_run_with_fasta_qual(self):
        'The pipeline runs in parallel with fasta and qual'
        pipeline = 'sanger_with_qual'

        fhand_adaptors = NamedTemporaryFile()
        fhand_adaptors.write(ADAPTORS)
        fhand_adaptors.flush()
        arabidopsis_genes = 'arabidopsis_genes+'
        univec = os.path.join(TEST_DATA_DIR, 'blast', arabidopsis_genes)
        configuration = {'remove_vectors': {'vectors': univec},
                         'remove_adaptors': {'adaptors': fhand_adaptors.name}}

        seq1 = create_random_seqwithquality(500, qual_range=50)
        seq2 = create_random_seqwithquality(500, qual_range=51)
        seq3 = create_random_seqwithquality(500, qual_range=52)
        seqs = [seq1, seq2, seq3]
        inseq_fhand, inqual_fhand = create_temp_seq_file(seqs, format='qual')

        in_fhands = {}
        in_fhands['in_seq'] = open(inseq_fhand.name)
        in_fhands['in_qual'] = open(inqual_fhand.name)

        outseq_fhand = NamedTemporaryFile()
        outqual_fhand = NamedTemporaryFile()
        writer = SequenceWriter(outseq_fhand, qual_fhand=outqual_fhand,
                                file_format='fasta')
        writers = {'seq': writer}

        seq_pipeline_runner(pipeline, configuration, in_fhands,
                            processes=4, writers=writers)
        out_fhand = open(outseq_fhand.name, 'r')

        result_seq = out_fhand.read()
        assert result_seq.count('>') == 3
开发者ID:BioinformaticsArchive,项目名称:franklin,代码行数:34,代码来源:pipeline_test.py



注:本文中的tempfile.NamedTemporaryFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tempfile.SpooledTemporaryFile类代码示例发布时间:2022-05-27
下一篇:
Python tempfile.tempfile函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap