• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python tempfile.SpooledTemporaryFile类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempfile.SpooledTemporaryFile的典型用法代码示例。如果您正苦于以下问题:Python SpooledTemporaryFile类的具体用法?Python SpooledTemporaryFile怎么用?Python SpooledTemporaryFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SpooledTemporaryFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: string2spool

def string2spool(input_string):
    """Takes a string as an argument and returns an open file handle with the
    contents of the string"""
    file_object=SpooledTemporaryFile()
    file_object.write(input_string)
    file_object.seek(0)
    return file_object
开发者ID:dmf24,项目名称:pyscripting,代码行数:7,代码来源:files.py


示例2: gen_data

def gen_data(location=None, **kwargs):
    """Fetches realtime data and generates records"""
    url = '%s/%s' % (kwargs['BASE_URL'], location)
    r = requests.get(url)
    f = SpooledTemporaryFile()  # wrap to access `fileno`
    f.write(r.content)
    return io.read_xls(r., sanitize=True, encoding=r.encoding)
开发者ID:reubano,项目名称:cookiecutter-collector,代码行数:7,代码来源:utils.py


示例3: __init__

    def __init__(self, data=None, fp=None, length=-1):
        assert bool(data is not None) ^ bool(fp)

        if length == -1:
            if data is not None:
                length = len(data)
            else:
                length = get_size(fp)  # can be -1

        # We allow writer reuse, but if we're working with a stream, we cannot
        # seek. Copy the data to a tempfile.
        if fp and not can_seek(fp):
            newfp = SpooledTemporaryFile(MAX_INMEMORY_SIZE)
            sendfile(newfp, fp)
            length = newfp.tell()
            newfp.seek(0)
            fp = newfp

        self.data = data
        self.fp = fp
        self.fpreads = 0  # keep track of fp usage
        self.length = length

        assert length >= 0
        self.use_tempfile = length > MAX_INMEMORY_SIZE
开发者ID:rmoorman,项目名称:pstore,代码行数:25,代码来源:crypt.py


示例4: fetch_data

def fetch_data(config):
    """Fetches realtime data and generates records"""
    ckan = CKAN(config['ENDPOINT'], apikey=config['API_KEY'])
    # r = ckan.fetch_resource(config['RID'])  # if using ckanutils
    resource = ckan.action.resource_show(id=config['RID'])
    url = resource.get('perma_link') or resource.get('url')
    r = requests.get(url, stream=True)

    if any('403' in h.headers.get('x-ckan-error', '') for h in r.history):
        raise NotAuthorized(
            'Access to fetch resource %s was denied.' % config['RID'])

    try:
        ext = splitext(url)[1].split('.')[1]
    except IndexError:
        ext = cv.ctype2ext(r.headers['Content-Type'])

    if ext == 'csv':
        records = io.read_csv(r.raw, sanitize=True, encoding=r.encoding)
    elif ext in {'xls', 'xlsx'}:
        r = requests.get(url)
        f = SpooledTemporaryFile()
        f.write(r.content)
        records = io.read_xls(f, sanitize=True, encoding=r.encoding)
    else:
        msg = 'Filetype `%s` unsupported.'
        msg += 'Please view tabutils.io documentation for assistance.'
        raise TypeError(msg)

    constraints = [('adm0_name', 'a'), ('mp_month', '3'), ('mp_year', '2015')]

    filterer = lambda x: all(x[k].lower().startswith(v) for k, v in constraints)
    return it.ifilter(filterer, records)
开发者ID:nelsonkimaiga,项目名称:hdx-file-proxy,代码行数:33,代码来源:utils.py


示例5: send

    def send(self, request, stream=None, timeout=None, verify=None, cert=None, proxies=None):
        pathname = url_to_path(request.url)

        resp = Response()
        resp.status_code = 200
        resp.url = request.url

        try:
            stats = lstat(pathname)
        except (IOError, OSError) as exc:
            resp.status_code = 404
            message = {
                "error": "file does not exist",
                "path": pathname,
                "exception": repr(exc),
            }
            fh = SpooledTemporaryFile()
            fh.write(ensure_binary(json.dumps(message)))
            fh.seek(0)
            resp.raw = fh
            resp.close = resp.raw.close
        else:
            modified = formatdate(stats.st_mtime, usegmt=True)
            content_type = guess_type(pathname)[0] or "text/plain"
            resp.headers = CaseInsensitiveDict({
                "Content-Type": content_type,
                "Content-Length": stats.st_size,
                "Last-Modified": modified,
            })

            resp.raw = open(pathname, "rb")
            resp.close = resp.raw.close
        return resp
开发者ID:ESSS,项目名称:conda,代码行数:33,代码来源:localfs.py


示例6: create_dump

 def create_dump(self):
     if not self.connection.is_usable():
         self.connection.connect()
     dump_file = SpooledTemporaryFile(max_size=10 * 1024 * 1024)
     self._write_dump(dump_file)
     dump_file.seek(0)
     return dump_file
开发者ID:Fisiu,项目名称:django-dbbackup,代码行数:7,代码来源:sqlite.py


示例7: push_index

 def push_index(self):
   stream = SpooledTemporaryFile(max_size=20 * MB)
   pointers = 0
   stream.write(struct.pack(OFFSET_FMT, pointers))
   self.indexes.append([
     stream, pointers, self.block_size - self.pointer_size
   ])
开发者ID:aparup,项目名称:common_crawl_index,代码行数:7,代码来源:pbtree.py


示例8: close

    def close(self):
        """Send the change to the DFS, and close the file."""

        self.flush()

        if 'c' not in self.mode:
            SpooledTemporaryFile.close(self)
开发者ID:Alexis-D,项目名称:DFS,代码行数:7,代码来源:client.py


示例9: run

    def run(self, opts):
        from lzma.xz import compress

        self.h = sha1()
        tdir = mkdtemp("calibre-mathjax-build")
        try:
            src = opts.path_to_mathjax or self.download_mathjax_release(tdir, opts.mathjax_url)
            self.info("Compressing MathJax...")
            t = SpooledTemporaryFile()
            with ZipFile(t, "w", ZIP_STORED) as zf:
                self.add_file(zf, self.j(src, "unpacked", "MathJax.js"), "MathJax.js")
                self.add_tree(
                    zf,
                    self.j(src, "fonts", "HTML-CSS", self.FONT_FAMILY, "woff"),
                    "fonts/HTML-CSS/%s/woff" % self.FONT_FAMILY,
                )
                for d in "extensions jax/element jax/input jax/output/CommonHTML".split():
                    self.add_tree(zf, self.j(src, "unpacked", *d.split("/")), d)

                zf.comment = self.h.hexdigest()
            t.seek(0)
            with open(self.j(self.RESOURCES, "content-server", "mathjax.zip.xz"), "wb") as f:
                compress(t, f, level=9)
            with open(self.j(self.RESOURCES, "content-server", "mathjax.version"), "wb") as f:
                f.write(zf.comment)
        finally:
            shutil.rmtree(tdir)
开发者ID:timpalpant,项目名称:calibre,代码行数:27,代码来源:mathjax.py


示例10: GoogleCloudFile

class GoogleCloudFile(File):
    def __init__(self, name, mode, storage):
        self.name = name
        self.mime_type = mimetypes.guess_type(name)[0]
        self._mode = mode
        self._storage = storage
        # NOTE(mattrobenolt): This is the same change in behavior as in
        # the s3 backend. We're opting now to load the file
        # or metadata at this step. This means we won't actually
        # know a file doesn't exist until we try to read it.
        self.blob = FancyBlob(storage.download_url, self.name, storage.bucket)
        self._file = None
        self._is_dirty = False

    @property
    def size(self):
        return self.blob.size

    def _get_file(self):
        if self._file is None:
            with metrics.timer('filestore.read', instance='gcs'):
                self._file = SpooledTemporaryFile(
                    max_size=self._storage.max_memory_size,
                    suffix=".GSStorageFile",
                    dir=None,
                )
                if 'r' in self._mode:
                    self._is_dirty = False
                    self.blob.download_to_file(self._file)
                    self._file.seek(0)
        return self._file

    def _set_file(self, value):
        self._file = value

    file = property(_get_file, _set_file)

    def read(self, num_bytes=None):
        if 'r' not in self._mode:
            raise AttributeError("File was not opened in read mode.")

        if num_bytes is None:
            num_bytes = -1

        return super(GoogleCloudFile, self).read(num_bytes)

    def write(self, content):
        if 'w' not in self._mode:
            raise AttributeError("File was not opened in write mode.")
        self._is_dirty = True
        return super(GoogleCloudFile, self).write(force_bytes(content))

    def close(self):
        if self._file is not None:
            if self._is_dirty:
                self.file.seek(0)
                self.blob.upload_from_file(self.file, content_type=self.mime_type)
            self._file.close()
            self._file = None
开发者ID:alexandrul,项目名称:sentry,代码行数:59,代码来源:gcs.py


示例11: start

 def start(self, args):
     self.outFile = SpooledTemporaryFile()
     self.errFile = SpooledTemporaryFile()
     self.cmdline = list2cmdline(args)
     print 'starting: ' + self.cmdline
     self.process = Popen(args,
         stderr=self.errFile, stdout=self.outFile, universal_newlines=False)
     self.process_start = time()
开发者ID:GRay63,项目名称:TestAppGUI,代码行数:8,代码来源:startfinish.py


示例12: _open

    def _open(self, name, mode = 'rb') -> File:
        name = self._transform_name(name)
        content = self.service.get_blob_content(self.container, name)
        file = SpooledTemporaryFile()
        file.write(content)
        file.seek(0) # explicitly reset to allow reading from the beginning afterwards as-is

        return File(file)
开发者ID:Bunkerbewohner,项目名称:azurepython3,代码行数:8,代码来源:djangostorage.py


示例13: generate

    def generate(self):
        points = self.points()
        
        self.buffer = 2*self.pad
        count = np.zeros([x + 2*self.buffer for x in self.expanded_size])
        density = np.zeros([x + 2*self.buffer for x in self.expanded_size])
        
        # Render the B&W density version of the heatmap
        dot_size = self.dot.shape[0]
        for x, y, weight in points:
            x1 = x + self.buffer - (dot_size - 1)/2
            y1 = y + self.buffer - (dot_size - 1)/2
            count[y1:(y1 + dot_size), 
                x1:(x1 + dot_size)] += self.dot
            density[y1:(y1 + dot_size), 
                x1:(x1+ dot_size)] += self.dot*float(weight)

        # Pick the field to map
        if gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_COUNT:
            img = count
            #opacity = np.zeros(img.shape()) + 255
        elif  gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_SUM_DENSITY:
            img = density
            #opacity = np.clip(count, 0, gheat_settings.GHEAT_OPACITY_LIMIT)
        elif  gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_MEAN_DENSITY:
            img = density
            img[count > 0] /= count[count > 0]
            #opacity = np.clip(count, 0, gheat_settings.GHEAT_OPACITY_LIMIT)
        else:
            raise ValueError, 'Unknown map mode'
            
        # Crop resulting density image (which could have grown) into the
        # actual canvas size we want
        img = img[(self.pad + self.buffer):(SIZE + self.pad + self.buffer), 
            (self.pad + self.buffer):(SIZE + self.pad + self.buffer)]
        #opacity = opacity[self.pad:(SIZE + self.pad), self.pad:(SIZE + self.pad)]


        # Maybe use a logarithm
        img = np.where(img>0, np.log(img)+1, img)

        # Convert to a 0 to 255 image
        img = np.clip(256.0*np.power(img/gheat_settings.GHEAT_MAX_VALUE, 
            gheat_settings.GHEAT_SCALING_COEFFICIENT), 0, 255.999).astype('uint8')
        

        # Given the B&W density image, generate a color heatmap based on
        # this Tile's color scheme.
        colour_image = np.zeros((SIZE, SIZE, 4), 'uint8') + 255
        for i in range(3):
            colour_image[:,:,i] = self.schemeobj.colors[:,i][255 - img]
        colour_image[:,:,3] = np.where(img > gheat_settings.GHEAT_MIN_DENSITY, 255, 0)

        tmpfile = SpooledTemporaryFile()
        writer = png.Writer(SIZE, SIZE, alpha=True, bitdepth=8)
        writer.write(tmpfile, np.reshape(colour_image, (SIZE, SIZE*4)))
        tmpfile.seek(0)
        return tmpfile
开发者ID:dragonfly-science,项目名称:django-gheat,代码行数:58,代码来源:numeric.py


示例14: test_run_command_stdin

 def test_run_command_stdin(self):
     connector = BaseCommandDBConnector()
     stdin = SpooledTemporaryFile()
     stdin.write(b'foo')
     stdin.seek(0)
     # Run
     stdout, stderr = connector.run_command('cat', stdin=stdin)
     self.assertEqual(stdout.read(), b'foo')
     self.assertFalse(stderr.read())
开发者ID:pombredanne,项目名称:django-dbbackup,代码行数:9,代码来源:test_connectors.py


示例15: file_from_content

def file_from_content(content):
    f = content
    if isinstance(content, cgi.FieldStorage):
        f = content.file
    elif isinstance(content, byte_string):
        f = SpooledTemporaryFile(INMEMORY_FILESIZE)
        f.write(content)
    f.seek(0)
    return f
开发者ID:dirn,项目名称:depot,代码行数:9,代码来源:utils.py


示例16: GoogleCloudFile

class GoogleCloudFile(File):
    def __init__(self, name, mode, storage):
        self.name = name
        self.mime_type = mimetypes.guess_type(name)[0]
        self._mode = mode
        self._storage = storage
        self.blob = storage.bucket.get_blob(name)
        if not self.blob and 'w' in mode:
            self.blob = Blob(self.name, storage.bucket)
        self._file = None
        self._is_dirty = False

    @property
    def size(self):
        return self.blob.size

    def _get_file(self):
        if self._file is None:
            self._file = SpooledTemporaryFile(
                max_size=self._storage.max_memory_size,
                suffix=".GSStorageFile",
                dir=setting("FILE_UPLOAD_TEMP_DIR", None)
            )
            if 'r' in self._mode:
                self._is_dirty = False
                self.blob.download_to_file(self._file)
                self._file.seek(0)
        return self._file

    def _set_file(self, value):
        self._file = value

    file = property(_get_file, _set_file)

    def read(self, num_bytes=None):
        if 'r' not in self._mode:
            raise AttributeError("File was not opened in read mode.")

        if num_bytes is None:
            num_bytes = -1

        return super(GoogleCloudFile, self).read(num_bytes)

    def write(self, content):
        if 'w' not in self._mode:
            raise AttributeError("File was not opened in write mode.")
        self._is_dirty = True
        return super(GoogleCloudFile, self).write(force_bytes(content))

    def close(self):
        if self._file is not None:
            if self._is_dirty:
                self.file.seek(0)
                self.blob.upload_from_file(self.file, content_type=self.mime_type)
            self._file.close()
            self._file = None
开发者ID:cogzidel,项目名称:healthchecks,代码行数:56,代码来源:gcloud.py


示例17: close

 def close(self):
     """On close, seek to 0 and write the data via the API, then close()
     for realz
     """
     logger.debug("close() called on WriteBuffer")
     self.seek(0)
     logger.debug("Attempting to create file at dir_path %s with name %s" %
                  (self.path, self.filename))
     self.fs.rc.fs.write_file(self, self.fullpath)
     SpooledTemporaryFile.close(self)  # old-style class!
开发者ID:Qumulo,项目名称:qftpd,代码行数:10,代码来源:qftpd.py


示例18: read_file_handle

 def read_file_handle(self, filename):
     """Get the file data, put it in a SpooledTemporaryFile object for return
     and reading
     """
     logger.debug("read_file_handle('%s')" % filename)
     read_buffer = SpooledTemporaryFile()
     response = self.rc.fs.read_file(read_buffer, filename)
     logger.debug(response)
     read_buffer.seek(0)
     return read_buffer
开发者ID:Qumulo,项目名称:qftpd,代码行数:10,代码来源:qftpd.py


示例19: filter_file

def filter_file(filter, filename, membuffer=10485760):
    tmp = SpooledTemporaryFile(max_size=membuffer)
    with open(filename) as input:
        for line in input:
            if filter(line):
                tmp.write(line)
    tmp.seek(0)
    with open(filename, "w") as output:
        for line in tmp:
            output.write(line)
开发者ID:StackOps,项目名称:stackops-agent,代码行数:10,代码来源:utils.py


示例20: run_command

    def run_command(self, command, stdin=None, env=None):
        """
        Launch a shell command line.

        :param command: Command line to launch
        :type command: str
        :param stdin: Standard input of command
        :type stdin: file
        :param env: Environment variable used in command
        :type env: dict
        :return: Standard output of command
        :rtype: file
        """
        cmd = shlex.split(command)
        stdout = SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE,
                                      dir=settings.TMP_DIR)
        stderr = SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE,
                                      dir=settings.TMP_DIR)
        full_env = self.env.copy()
        full_env.update(env or {})
        try:
            process = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr,
                            env=full_env)
            process.wait()
            if process.poll():
                stderr.seek(0)
                raise exceptions.CommandConnectorError(
                    "Error running: {}\n{}".format(command, stderr.read()))
            stdout.seek(0)
            stderr.seek(0)
            return stdout, stderr
        except OSError as err:
            raise exceptions.CommandConnectorError(
                "Error running: {}\n{}".format(command, str(err)))
开发者ID:Ubiwhere,项目名称:django-dbbackup,代码行数:34,代码来源:base.py



注:本文中的tempfile.SpooledTemporaryFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tempfile.TemporaryDirectory类代码示例发布时间:2022-05-27
下一篇:
Python tempfile.NamedTemporaryFile类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap