• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python tarfile.TarFile类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tarfile.TarFile的典型用法代码示例。如果您正苦于以下问题:Python TarFile类的具体用法?Python TarFile怎么用?Python TarFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TarFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: install_repo

    def install_repo(self, repo):
        if repo in KNOWN_PUBLIC_REPOS:
            repo = KNOWN_PUBLIC_REPOS[repo]['path']  # replace it by the url
        git_path = which('git')

        if not git_path:
            return ('git command not found: You need to have git installed on '
                    'your system to be able to install git based plugins.', )

        # TODO: Update download path of plugin.
        if repo.endswith('tar.gz'):
            tar = TarFile(fileobj=urlopen(repo))
            tar.extractall(path=self.plugin_dir)
            s = repo.split(':')[-1].split('/')[-2:]
            human_name = '/'.join(s).rstrip('.tar.gz')
        else:
            human_name = human_name_for_git_url(repo)
            p = subprocess.Popen([git_path, 'clone', repo, human_name], cwd=self.plugin_dir, stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE)
            feedback = p.stdout.read().decode('utf-8')
            error_feedback = p.stderr.read().decode('utf-8')
            if p.wait():
                return "Could not load this plugin: \n\n%s\n\n---\n\n%s" % (feedback, error_feedback),
        self.add_plugin_repo(human_name, repo)
        return self.update_dynamic_plugins()
开发者ID:s905060,项目名称:errbot,代码行数:25,代码来源:plugin_manager.py


示例2: install_packages

    def install_packages(self):
        self._update_package_with_install_path()
        installed_packages = []
        for p in self._packages:
            if 'install_from' not in p:
                print("[dem] Could not find package: {}, version: {}".format(p['name'], p['version']))
            else:
                if not self._cache.is_package_installed(p['name'], p['version']):
                    print('[dem] installing {}-{}'.format(p['name'], p['version']))
                    if p['install_from_ext'] == 'zip':
                        with ZipFile(p['install_from'], 'r') as archive:
                            locations = self._extract(archive, p)
                    elif p['install_from_ext'] == 'tar.gz':
                        with TarFile.open(p['install_from'], 'r:gz') as archive:
                            locations = self._extract(archive, p)
                    elif p['install_from_ext'] == 'tar.bz2':
                        with TarFile.open(p['install_from'], 'r:bz2') as archive:
                            locations = self._extract(archive, p)
                    elif p['install_from_ext'] == 'gz':
                        with gzip.open(p['install_from'], 'r') as archive:
                            locations = self._extract(archive, p)

                    if 'pkg-config' in p:
                        PkgConfigProcessor.replace_prefix(locations, p['pkg-config'])
                else:
                    print('[dem] {}-{} already installed'.format(p['name'], p['version']))
                    locations = self._cache.install_locations(p['name'])
                package = dict()
                package[p['name']] = {'version': p['version'], 'type': 'local', 'install_locations': locations}
                installed_packages.append(package)
        return installed_packages
开发者ID:nitehawck,项目名称:dem,代码行数:31,代码来源:archive.py


示例3: rebuild

def rebuild(filename, tag=None, format="gz"):
    import tempfile, shutil
    tmpdir = tempfile.mkdtemp()
    zonedir = os.path.join(tmpdir, "zoneinfo")
    moduledir = os.path.dirname(__file__)
    if tag: tag = "-"+tag
    targetname = "zoneinfo%s.tar.%s" % (tag, format)
    try:
        tf = TarFile.open(filename)
        # The "backwards" zone file contains links to other files, so must be
        # processed as last
        for name in sorted(tf.getnames(),
                           key=lambda k: k != "backward" and k or "z"):
            if not (name.endswith(".sh") or
                    name.endswith(".tab") or
                    name == "leapseconds"):
                tf.extract(name, tmpdir)
                filepath = os.path.join(tmpdir, name)
                os.system("zic -d %s %s" % (zonedir, filepath))
        tf.close()
        target = os.path.join(moduledir, targetname)
        for entry in os.listdir(moduledir):
            if entry.startswith("zoneinfo") and ".tar." in entry:
                os.unlink(os.path.join(moduledir, entry))
        tf = TarFile.open(target, "w:%s" % format)
        for entry in os.listdir(zonedir):
            entrypath = os.path.join(zonedir, entry)
            tf.add(entrypath, entry)
        tf.close()
    finally:
        shutil.rmtree(tmpdir)
开发者ID:ljxangus,项目名称:SNS,代码行数:31,代码来源:__init__.py


示例4: rebuild

def rebuild(filename, tag=None, format="gz", zonegroups=[], metadata=None):
    """Rebuild the internal timezone info in dateutil/zoneinfo/zoneinfo*tar*

    filename is the timezone tarball from ``ftp.iana.org/tz``.

    """
    tmpdir = tempfile.mkdtemp()
    zonedir = os.path.join(tmpdir, "zoneinfo")
    moduledir = os.path.dirname(__file__)
    try:
        with TarFile.open(filename) as tf:
            for name in zonegroups:
                tf.extract(name, tmpdir)
            filepaths = [os.path.join(tmpdir, n) for n in zonegroups]
            try:
                check_call(["zic", "-d", zonedir] + filepaths)
            except OSError as e:
                _print_on_nosuchfile(e)
                raise
        # write metadata file
        with open(os.path.join(zonedir, METADATA_FN), 'w') as f:
            json.dump(metadata, f, indent=4, sort_keys=True)
        target = os.path.join(moduledir, ZONEFILENAME)
        with TarFile.open(target, "w:%s" % format) as tf:
            for entry in os.listdir(zonedir):
                entrypath = os.path.join(zonedir, entry)
                tf.add(entrypath, entry)
    finally:
        shutil.rmtree(tmpdir)
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:29,代码来源:rebuild.py


示例5: create_archive

 def create_archive(self):
     (handle, path) = mkstemp(dir=self.temp_dir)
     os.close(handle)
     archive = TarFile(path, mode="w")
     archive.add(os.path.join(_common.RSRC, "full.mp3"), "full.mp3")
     archive.close()
     return path
开发者ID:jerryh91,项目名称:beets,代码行数:7,代码来源:test_importer.py


示例6: test_tar_experiment_download

 def test_tar_experiment_download(self):
     self.assertTrue(all(df.verified for df in self.dfs))
     response = self.client.get(reverse(
         'tardis.tardis_portal.download.streaming_download_experiment',
         args=(self.exp.id, 'tar')))
     with NamedTemporaryFile('w') as tarfile:
         for c in response.streaming_content:
             tarfile.write(c)
         tarfile.flush()
         self.assertEqual(int(response['Content-Length']),
                          os.stat(tarfile.name).st_size)
         tf = TarFile(tarfile.name)
         if settings.EXP_SPACES_TO_UNDERSCORES:
             exp_title = self.exp.title.replace(' ', '_')
         else:
             exp_title = self.exp.title
         exp_title = quote(exp_title,
                           safe=settings.SAFE_FILESYSTEM_CHARACTERS)
         for df in self.dfs:
             full_path = os.path.join(
                 exp_title,
                 quote(self.ds.description,
                       safe=settings.SAFE_FILESYSTEM_CHARACTERS),
                 df.directory, df.filename)
             # docker has a file path limit of ~240 characters
             if os.environ.get('DOCKER_BUILD', 'false') != 'true':
                 tf.extract(full_path, '/tmp')
                 self.assertEqual(
                     os.stat(os.path.join('/tmp', full_path)).st_size,
                     int(df.size))
开发者ID:keithschulze,项目名称:mytardis,代码行数:30,代码来源:test_tar_download.py


示例7: rebuild

def rebuild(filename, tag=None, format="gz"):
    import tempfile, shutil
    tmpdir = tempfile.mkdtemp()
    zonedir = os.path.join(tmpdir, "zoneinfo")
    moduledir = os.path.dirname(__file__)
    if tag: tag = "-"+tag
    targetname = "zoneinfo%s.tar.%s" % (tag, format)
    try:
        tf = TarFile.open(filename)
        for name in tf.getnames():
            if not (name.endswith(".sh") or
                    name.endswith(".tab") or
                    name == "leapseconds"):
                tf.extract(name, tmpdir)
                filepath = os.path.join(tmpdir, name)
                os.system("zic -d %s %s" % (zonedir, filepath))
        tf.close()
        target = os.path.join(moduledir, targetname)
        for entry in os.listdir(moduledir):
            if entry.startswith("zoneinfo") and ".tar." in entry:
                os.unlink(os.path.join(moduledir, entry))
        tf = TarFile.open(target, "w:%s" % format)
        for entry in os.listdir(zonedir):
            entrypath = os.path.join(zonedir, entry)
            tf.add(entrypath, entry)
        tf.close()
    finally:
        shutil.rmtree(tmpdir)
开发者ID:mcepar1,项目名称:Scheduler,代码行数:28,代码来源:__init__.py


示例8: install

    def install(self, mess, args):
        """ install a plugin repository from the given source or a known public repo (see !repos to find those).
        for example from a known repo : !install err-codebot
        for example a git url : [email protected]:gbin/plugin.git
        or an url towards a tar.gz archive : http://www.gootz.net/plugin-latest.tar.gz
        """
        if not args.strip():
            return "You should have an urls/git repo argument"
        if args in KNOWN_PUBLIC_REPOS:
            args = KNOWN_PUBLIC_REPOS[args][0] # replace it by the url
        git_path = which('git')

        if not git_path:
            return 'git command not found: You need to have git installed on your system to by able to install git based plugins.'

        if args.endswith('tar.gz'):
            tar = TarFile(fileobj=urlopen(args))
            tar.extractall(path= PLUGIN_DIR)
            human_name = args.split('/')[-1][:-7]
        else:
            human_name = human_name_for_git_url(args)
            p = subprocess.Popen([git_path, 'clone', args, human_name], cwd = PLUGIN_DIR, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
            feedback = p.stdout.read()
            error_feedback = p.stderr.read()
            if p.wait():
               return "Could not load this plugin : \n%s\n---\n%s" % (feedback, error_feedback)
        self.add_plugin_repo(human_name, args)
        errors = self.update_dynamic_plugins()
        if errors:
            self.send(mess.getFrom(), 'Some plugins are generating errors:\n' + '\n'.join(errors) , message_type=mess.getType())
        else:
            self.send(mess.getFrom(), "A new plugin repository named %s has been installed correctly from %s. Refreshing the plugins commands..." % (human_name, args), message_type=mess.getType())
        self.activate_non_started_plugins()
        return "Plugin reload done."
开发者ID:glenbot,项目名称:err,代码行数:34,代码来源:errBot.py


示例9: _createScriptExtensionTarArchive

 def _createScriptExtensionTarArchive(self, sourceDirectory, scriptExtensionName):
     """ Creates a TAR archive for the given script extension. """
     
     tarFileName = scriptExtensionName + ".tar"
     tarFilePath = os.path.join(self.__buildConfiguration.distDirectory, tarFileName)
     tarFile = TarFile(tarFilePath, "w")
     
     for inputDirectory in ["lib", "src"]:
         baseDirectory = os.path.join(sourceDirectory, inputDirectory)
         if os.path.exists(baseDirectory):
             for packageDirName in os.listdir(baseDirectory):
                 pythonModulesToAddList = list()
                 packageDirectory = os.path.join(baseDirectory, packageDirName)
                 if os.path.exists(packageDirectory):
                     for walkTuple in os.walk(packageDirectory):
                         directoryPath = walkTuple[0]
                         fileNameList = walkTuple[2]
                         for fileName in fileNameList:
                             if fileName.endswith(".py") or fileName == "SCRIPTS":
                                 filePath = os.path.join(directoryPath, fileName)
                                 pythonModulesToAddList.append(filePath)
         
                 for pythonModule in pythonModulesToAddList:
                     startPosition = pythonModule.find(baseDirectory) + len(baseDirectory) + 1
                     archiveName = pythonModule[startPosition:]
                     tarFile.add(pythonModule, archiveName)
     tarFile.close()
     if self.verbose:
         print("Created tar archive '%s'." % tarFilePath)
开发者ID:DLR-SC,项目名称:DataFinder,代码行数:29,代码来源:package_script_extension.py


示例10: main

def main(argv):
    import getopt
    def usage():
        print('usage: %s [-b basedir] cmd [arg ...]' % argv[0])
        return 100
    try:
        (opts, args) = getopt.getopt(argv[1:], 'db:')
    except getopt.GetoptError:
        return usage()
    debug = 0
    basedir = 'tar'
    for (k, v) in opts:
        if k == '-d': debug += 1
        elif k == '-b': basedir = v
    tardb = TarDB(basedir)
    if not args: return usage()
    cmd = args.pop(0)
    if cmd == 'create':
        tardb.create()
    elif cmd == 'import':
        tardb.open()
        for path in args:
            tar = TarFile(path)
            while True:
                info = tar.next()
                if info is None: break
                fp = tar.fileobj
                fp.seek(info.offset+BLOCKSIZE)
                data = fp.read(info.size)
                tardb.add_record(info, data)
            tardb.flush()
        tardb.close()
    elif cmd == 'add':
        tardb.open()
        for path in args:
            name = os.path.basename(path)
            info = TarInfo(name)
            with open(path, 'rb') as fp:
                data = fp.read()
            recno = tardb.add_record(info, data)
            print(recno)
        tardb.close()
    elif cmd == 'get':
        tardb.open()
        for recno in args:
            recno = int(recno)
            (_, data) = tardb.get_recinfo(recno, True)
            sys.stdout.buffer.write(data)
        tardb.close()
    elif cmd == 'getinfo':
        tardb.open()
        for recno in args:
            recno = int(recno)
            (info, _) = tardb.get_recinfo(recno, False)
            print(info)
        tardb.close()
    else:
        return usage()
    return 0
开发者ID:euske,项目名称:verbose-giggle,代码行数:59,代码来源:tardb.py


示例11: move_certs

    def move_certs(self, paths):
        self.log.info("Staging internal ssl certs for %s", self._log_name)
        yield self.pull_image(self.move_certs_image)
        # create the volume
        volume_name = self.format_volume_name(self.certs_volume_name, self)
        # create volume passes even if it already exists
        self.log.info("Creating ssl volume %s for %s", volume_name, self._log_name)
        yield self.docker('create_volume', volume_name)

        # create a tar archive of the internal cert files
        # docker.put_archive takes a tarfile and a running container
        # and unpacks the archive into the container
        nb_paths = {}
        tar_buf = BytesIO()
        archive = TarFile(fileobj=tar_buf, mode='w')
        for key, hub_path in paths.items():
            fname = os.path.basename(hub_path)
            nb_paths[key] = '/certs/' + fname
            with open(hub_path, 'rb') as f:
                content = f.read()
            tarinfo = TarInfo(name=fname)
            tarinfo.size = len(content)
            tarinfo.mtime = os.stat(hub_path).st_mtime
            tarinfo.mode = 0o644
            archive.addfile(tarinfo, BytesIO(content))
        archive.close()
        tar_buf.seek(0)

        # run a container to stage the certs,
        # mounting the volume at /certs/
        host_config = self.client.create_host_config(
            binds={
                volume_name: {"bind": "/certs", "mode": "rw"},
            },
        )
        container = yield self.docker('create_container',
            self.move_certs_image,
            volumes=["/certs"],
            host_config=host_config,
        )

        container_id = container['Id']
        self.log.debug(
            "Container %s is creating ssl certs for %s",
            container_id[:12], self._log_name,
        )
        # start the container
        yield self.docker('start', container_id)
        # stage the archive to the container
        try:
            yield self.docker(
                'put_archive',
                container=container_id,
                path='/certs',
                data=tar_buf,
            )
        finally:
            yield self.docker('remove_container', container_id)
        return nb_paths
开发者ID:jupyterhub,项目名称:dockerspawner,代码行数:59,代码来源:dockerspawner.py


示例12: read_file_from_image

def read_file_from_image(img: tarfile.TarFile,
                         file_path: str,
                         autoclose=False) -> bytes:
    if autoclose:
        with closing(img.extractfile(file_path)) as fd:
            return fd.read()
    else:
        return img.extractfile(file_path).read()
开发者ID:yege0201,项目名称:dockerscan,代码行数:8,代码来源:docker_api.py


示例13: generate_tar

 def generate_tar(entries):
     tar_buf = BytesIO()
     tar_file = TarFile(mode="w", fileobj=tar_buf)
     for path, contents in entries.items():
         tar_info = TarInfo(name=path)
         tar_info.size = len(contents)
         tar_file.addfile(tar_info, fileobj=BytesIO(contents))
     return BytesIO(tar_buf.getvalue())
开发者ID:thepwagner,项目名称:flotilla,代码行数:8,代码来源:test_revision.py


示例14: parse_backup_label

 def parse_backup_label(self, basebackup_path):
     tar = TarFile(basebackup_path)
     content = tar.extractfile("backup_label").read()  # pylint: disable=no-member
     for line in content.split(b"\n"):
         if line.startswith(b"START WAL LOCATION"):
             start_wal_segment = line.split(b" ")[5].strip(b")").decode("utf8")
     self.log.debug("Found: %r as starting wal segment", start_wal_segment)
     return start_wal_segment
开发者ID:Ormod,项目名称:pghoard,代码行数:8,代码来源:basebackup.py


示例15: reader

 def reader(self):
     """Package up filesystem contents as a tarball."""
     result = BytesIO()
     tarball = TarFile(fileobj=result, mode="w")
     for child in self.path.children():
         tarball.add(child.path, arcname=child.basename(), recursive=True)
     tarball.close()
     result.seek(0, 0)
     yield result
开发者ID:networkelements,项目名称:flocker,代码行数:9,代码来源:memory.py


示例16: download

    def download(self):
        """
        Ein Download wird ausgeführt
        """
        self.init2() # Basisklasse einrichten

        simulation = self.request.POST.get("simulation", False)

        self._setup_path()
        if simulation:
            self.request.echo("<h1>Download Simulation!</h1><pre>")
            self.request.echo("request path: %s\n" % self.request_path)
            log_typ = "download simulation start"
        else:
            log_typ = "download start"

        self.db.log(log_typ, self.context['request_path'])

        artist = self.request.POST.get("artist", "")
        album = self.request.POST.get("album", "")

        files, _ = self._read_dir()

        args = {"prefix": "PyDown_%s_" % self.request.environ["REMOTE_USER"]}
        if self.request.cfg["temp"]:
            args["dir"] = self.request.cfg["temp"]
        temp = NamedTemporaryFile(**args)

        tar = TarFile(mode="w", fileobj=temp)

        if simulation:
            self.request.write("-"*80)
            self.request.write("\n")

        for file_info in files:
            filename = file_info[0]
            abs_path = posixpath.join(self.request_path, filename)
            arcname = posixpath.join(artist, album, filename)

            if simulation:
                #~ self.request.write("absolute path..: %s\n" % abs_path)
                self.request.write("<strong>%s</strong>\n" % arcname)

            try:
                tar.add(abs_path, arcname)
            except IOError, e:
                self.request.write("<h1>Error</h1><h2>Can't create archive: %s</h2>" % e)
                try:
                    tar.close()
                except:
                    pass
                try:
                    temp.close()
                except:
                    pass
                return
开发者ID:Aaron1011,项目名称:python-code-snippets,代码行数:56,代码来源:PyDown.py


示例17: TarFileWrapper

class TarFileWrapper(ArchiveFileWrapper):
    def __init__(self, fh, *args, **kwargs):
        self.archive = TarFile(fileobj=fh)
        super(TarFileWrapper, self).__init__(*args, **kwargs)

    def extract_file(self, *args, **kwarg):
        return self.archive.extractfile(*args, **kwarg)

    def names(self):
        return self.archive.getnames()
开发者ID:fcurella,项目名称:cookiejar,代码行数:10,代码来源:extractor.py


示例18: download

    def download(self, src, dest, extract_here=False):
        client = connect()

        with SpooledTemporaryFile() as file:
            file.write(client.copy(self.container_id, src).read())
            file.seek(0)
            tfile = TarFile(fileobj=file)
            if extract_here:
                base = len(os.path.basename(src)) + 1
                for member in tfile.getmembers():
                    member.name = member.name[base:]
            tfile.extractall(path=dest)
开发者ID:pombredanne,项目名称:meuh-python,代码行数:12,代码来源:bot.py


示例19: extract_package

def extract_package(package_name, path, logger):
    try:
        if ".tar" in package_name:
            TarFile.open(package_name).extractall(path)
        elif ".zip" in package_name:
            ZipFile(package_name).extractall(path)
        else:
            raise FileTypeError("It's not a TAR or ZIP archive.")
    except Exception as err:
        register_exception(alert_admin=True,
                           prefix="Elsevier error extracting package.")
        logger.error("Error extraction package file: %s %s"
                     % (path, err))
开发者ID:Dziolas,项目名称:scoap3_old,代码行数:13,代码来源:scoap3utils.py


示例20: __enter__

    def __enter__(self):
        if self.tf is not None:
            raise ValueError('Cannot re-enter')

        if '://' in self.web_archive:
            info('Downloading from {0}'.format(self.web_archive))
            dl = requests.get(self.web_archive)

            self.tf = TarFile.open(path(self.web_archive).basename(),
                                   'r:*', fileobj=io.BytesIO(dl.content))
        else:
            self.tf = TarFile.open(self.web_archive)
        return self.tf
开发者ID:cecedille1,项目名称:Sett,代码行数:13,代码来源:tar.py



注:本文中的tarfile.TarFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tarfile.TarInfo类代码示例发布时间:2022-05-27
下一篇:
Python tarfile.open函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap