本文整理汇总了Python中zeroinstall.support.ro_rmtree函数的典型用法代码示例。如果您正苦于以下问题:Python ro_rmtree函数的具体用法?Python ro_rmtree怎么用?Python ro_rmtree使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ro_rmtree函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: tearDown
def tearDown(self):
BaseTest.tearDown(self)
support.ro_rmtree(self.store_parent)
support.ro_rmtree(self.tmp)
cli.stores = None
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:7,代码来源:teststore.py
示例2: testCopy
def testCopy(self):
sha1 = manifest.get_algorithm('sha1')
sha1new = manifest.get_algorithm('sha1new')
source = os.path.join(self.tmp, 'badname')
os.mkdir(source)
self.populate_sample(source)
lines = list(sha1new.generate_manifest(source))
self.assertEqual(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile',
'S 570b0ce957ab43e774c82fca0ea3873fc452278b 19 a symlink',
'D /My Dir',
'F 0236ef92e1e37c57f0eb161e7e2f8b6a8face705 2 10 !a file!',
'X b4ab02f2c791596a980fd35f51f5d92ee0b4705c 2 10 !a file!.exe'],
lines)
digest = sha1.getID(manifest.add_manifest_file(source, sha1))
copy = tempfile.mktemp()
os.mkdir(copy)
try:
# Source must be in the form alg=value
try:
cli.do_copy([source, copy])
assert 0
except BadDigest as ex:
assert 'badname' in str(ex)
source, badname = os.path.join(self.tmp, digest), source
os.chmod(badname, 0o755) # can't rename RO directories on MacOS X
os.rename(badname, source)
os.chmod(source, 0o555)
# Can't copy sha1 implementations (unsafe)
try:
cli.do_copy([source, copy])
except SafeException as ex:
assert 'sha1' in str(ex)
# Already have a .manifest
try:
manifest.add_manifest_file(source, sha1new)
assert 0
except SafeException as ex:
assert '.manifest' in str(ex)
os.chmod(source, 0o700)
os.unlink(os.path.join(source, '.manifest'))
# Switch to sha1new
digest = sha1new.getID(manifest.add_manifest_file(source, sha1new))
source, badname = os.path.join(self.tmp, digest), source
os.chmod(badname, 0o755)
os.rename(badname, source)
os.chmod(source, 0o555)
cli.do_copy([source, copy])
with open(os.path.join(copy, digest, 'MyFile'), 'rt') as stream:
self.assertEqual('Hello', stream.read())
finally:
support.ro_rmtree(copy)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:60,代码来源:teststore.py
示例3: add_archive_to_cache
def add_archive_to_cache(self, required_digest, data, url, extract = None, type = None, start_offset = 0, try_helper = False, dry_run = False):
"""@type required_digest: str
@type data: file
@type url: str
@type extract: str | None
@type type: str | None
@type start_offset: int
@type try_helper: bool
@type dry_run: bool"""
from . import unpack
if self.lookup(required_digest):
logger.info(_("Not adding %s as it already exists!"), required_digest)
return
tmp = self.get_tmp_dir_for(required_digest)
try:
unpack.unpack_archive(url, data, tmp, extract, type = type, start_offset = start_offset)
except:
import shutil
shutil.rmtree(tmp)
raise
try:
self.check_manifest_and_rename(required_digest, tmp, extract, try_helper = try_helper, dry_run = dry_run)
except Exception:
#warn(_("Leaving extracted directory as %s"), tmp)
support.ro_rmtree(tmp)
raise
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:29,代码来源:__init__.py
示例4: tearDown
def tearDown(self):
shutil.rmtree(self.config_home)
support.ro_rmtree(self.cache_home)
shutil.rmtree(self.cache_system)
shutil.rmtree(self.gnupg_home)
os.environ['PATH'] = self.old_path
开发者ID:turtledb,项目名称:0install,代码行数:7,代码来源:basetest.py
示例5: check_manifest_and_rename
def check_manifest_and_rename(self, required_digest, tmp, extract = None, try_helper = False, dry_run = False):
"""Check that tmp[/extract] has the required_digest.
On success, rename the checked directory to the digest, and
make the whole tree read-only.
@type required_digest: str
@type tmp: str
@type extract: str | None
@param try_helper: attempt to use privileged helper to import to system cache first (since 0.26)
@type try_helper: bool
@param dry_run: just print what we would do to stdout (and delete tmp)
@type dry_run: bool
@raise BadDigest: if the input directory doesn't match the given digest"""
if extract:
extracted = os.path.join(tmp, extract)
if not os.path.isdir(extracted):
raise Exception(_('Directory %s not found in archive') % extract)
else:
extracted = tmp
from . import manifest
manifest.fixup_permissions(extracted)
alg, required_value = manifest.splitID(required_digest)
actual_digest = alg.getID(manifest.add_manifest_file(extracted, alg))
if actual_digest != required_digest:
raise BadDigest(_('Incorrect manifest -- archive is corrupted.\n'
'Required digest: %(required_digest)s\n'
'Actual digest: %(actual_digest)s\n') %
{'required_digest': required_digest, 'actual_digest': actual_digest})
if try_helper:
if self._add_with_helper(required_digest, extracted, dry_run = dry_run):
support.ro_rmtree(tmp)
return
logger.info(_("Can't add to system store. Trying user store instead."))
logger.info(_("Caching new implementation (digest %s) in %s"), required_digest, self.dir)
final_name = os.path.join(self.dir, required_digest)
if os.path.isdir(final_name):
logger.warning(_("Item %s already stored.") % final_name) # not really an error
return
if dry_run:
print(_("[dry-run] would store implementation as {path}").format(path = final_name))
self.dry_run_names.add(required_digest)
support.ro_rmtree(tmp)
return
else:
# If we just want a subdirectory then the rename will change
# extracted/.. and so we'll need write permission on 'extracted'
os.chmod(extracted, 0o755)
os.rename(extracted, final_name)
os.chmod(final_name, 0o555)
if extract:
os.rmdir(tmp)
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:59,代码来源:__init__.py
示例6: tearDown
def tearDown(self):
assert self.config.handler.ex is None, self.config.handler.ex
shutil.rmtree(self.config_home)
support.ro_rmtree(self.cache_home)
shutil.rmtree(self.cache_system)
shutil.rmtree(self.gnupg_home)
os.environ['PATH'] = self.old_path
开发者ID:pombredanne,项目名称:zero-install,代码行数:9,代码来源:basetest.py
示例7: unpack_archive_over
def unpack_archive_over(url, data, destdir, extract = None, type = None, start_offset = 0):
"""Like unpack_archive, except that we unpack to a temporary directory first and
then move things over, checking that we're not following symlinks at each stage.
Use this when you want to unpack an unarchive into a directory which already has
stuff in it.
@note: Since 0.49, the leading "extract" component is removed (unlike unpack_archive).
@since: 0.28"""
import stat
tmpdir = mkdtemp(dir = destdir)
assert extract is None or os.sep not in extract, extract
try:
mtimes = []
unpack_archive(url, data, tmpdir, extract, type, start_offset)
if extract is None:
srcdir = tmpdir
else:
srcdir = os.path.join(tmpdir, extract)
assert not os.path.islink(srcdir)
stem_len = len(srcdir)
for root, dirs, files in os.walk(srcdir):
relative_root = root[stem_len + 1:] or '.'
target_root = os.path.join(destdir, relative_root)
try:
info = os.lstat(target_root)
except OSError as ex:
if ex.errno != errno.ENOENT:
raise # Some odd error.
# Doesn't exist. OK.
os.mkdir(target_root)
else:
if stat.S_ISLNK(info.st_mode):
raise SafeException(_('Attempt to unpack dir over symlink "%s"!') % relative_root)
elif not stat.S_ISDIR(info.st_mode):
raise SafeException(_('Attempt to unpack dir over non-directory "%s"!') % relative_root)
mtimes.append((relative_root, os.lstat(os.path.join(srcdir, root)).st_mtime))
for s in dirs: # Symlinks are counted as directories
src = os.path.join(srcdir, relative_root, s)
if os.path.islink(src):
files.append(s)
for f in files:
src = os.path.join(srcdir, relative_root, f)
dest = os.path.join(destdir, relative_root, f)
if os.path.islink(dest):
raise SafeException(_('Attempt to unpack file over symlink "%s"!') %
os.path.join(relative_root, f))
os.rename(src, dest)
for path, mtime in mtimes[1:]:
os.utime(os.path.join(destdir, path), (mtime, mtime))
finally:
ro_rmtree(tmpdir)
开发者ID:dabrahams,项目名称:0install,代码行数:56,代码来源:unpack.py
示例8: tearDown
def tearDown(self):
if self.config.handler.ex:
support.raise_with_traceback(self.config.handler.ex, self.config.handler.tb)
shutil.rmtree(self.config_home)
support.ro_rmtree(self.cache_home)
shutil.rmtree(self.cache_system)
shutil.rmtree(self.gnupg_home)
os.environ["PATH"] = self.old_path
开发者ID:pombredanne,项目名称:0install,代码行数:10,代码来源:basetest.py
示例9: cook
def cook(self, required_digest, recipe, stores, force = False, impl_hint = None, dry_run = False, may_use_mirror = True):
"""Follow a Recipe.
@type required_digest: str
@type recipe: L{Recipe}
@type stores: L{zeroinstall.zerostore.Stores}
@type force: bool
@param impl_hint: the Implementation this is for (as a hint for the GUI, and to allow local files)
@type dry_run: bool
@type may_use_mirror: bool
@see: L{download_impl} uses this method when appropriate"""
# Maybe we're taking this metaphor too far?
# Start a download for each ingredient
blockers = []
steps = []
try:
for stepdata in recipe.steps:
cls = StepRunner.class_for(stepdata)
step = cls(stepdata, impl_hint = impl_hint, may_use_mirror = may_use_mirror)
step.prepare(self, blockers)
steps.append(step)
while blockers:
yield blockers
tasks.check(blockers)
blockers = [b for b in blockers if not b.happened]
if self.external_store:
# Note: external_store will not work with non-<archive> steps.
streams = [step.stream for step in steps]
self._add_to_external_store(required_digest, recipe.steps, streams)
else:
# Create an empty directory for the new implementation
store = stores.stores[0]
tmpdir = store.get_tmp_dir_for(required_digest)
try:
# Unpack each of the downloaded archives into it in turn
for step in steps:
step.apply(tmpdir)
# Check that the result is correct and store it in the cache
stores.check_manifest_and_rename(required_digest, tmpdir, dry_run=dry_run)
tmpdir = None
finally:
# If unpacking fails, remove the temporary directory
if tmpdir is not None:
support.ro_rmtree(tmpdir)
finally:
for step in steps:
try:
step.close()
except IOError as ex:
# Can get "close() called during
# concurrent operation on the same file
# object." if we're unlucky (Python problem).
logger.info("Failed to close: %s", ex)
开发者ID:linuxmidhun,项目名称:0install,代码行数:55,代码来源:fetch.py
示例10: backup_if_exists
def backup_if_exists(name):
if not os.path.exists(name):
return
backup = name + '~'
if os.path.exists(backup):
print "(deleting old backup %s)" % backup
if os.path.isdir(backup):
ro_rmtree(backup)
else:
os.unlink(backup)
portable_rename(name, backup)
print "(renamed old %s as %s; will delete on next run)" % (name, backup)
开发者ID:0install,项目名称:0release,代码行数:12,代码来源:support.py
示例11: extract_gem
def extract_gem(stream, destdir, extract = None, start_offset = 0):
"@since: 0.53"
stream.seek(start_offset)
payload = 'data.tar.gz'
payload_stream = None
tmpdir = mkdtemp(dir = destdir)
try:
extract_tar(stream, destdir=tmpdir, extract=payload, decompress=None)
with open(os.path.join(tmpdir, payload), 'rb') as payload_stream:
extract_tar(payload_stream, destdir=destdir, extract=extract, decompress='gzip')
finally:
if payload_stream:
payload_stream.close()
ro_rmtree(tmpdir)
开发者ID:dabrahams,项目名称:0install,代码行数:14,代码来源:unpack.py
示例12: extract_gem
def extract_gem(stream, destdir, extract=None, start_offset=0):
"""@type stream: file
@type destdir: str
@type start_offset: int
@since: 0.53"""
stream.seek(start_offset)
payload = "data.tar.gz"
payload_stream = None
tmpdir = mkdtemp(dir=destdir)
try:
extract_tar(stream, destdir=tmpdir, extract=payload, decompress=None)
with open(os.path.join(tmpdir, payload), "rb") as payload_stream:
extract_tar(payload_stream, destdir=destdir, extract=extract, decompress="gzip")
finally:
if payload_stream:
payload_stream.close()
ro_rmtree(tmpdir)
开发者ID:pombredanne,项目名称:0install,代码行数:17,代码来源:unpack.py
示例13: check_manifest_and_rename
def check_manifest_and_rename(self, required_digest, tmp, extract = None, try_helper = False):
"""Check that tmp[/extract] has the required_digest.
On success, rename the checked directory to the digest, and
make the whole tree read-only.
@param try_helper: attempt to use privileged helper to import to system cache first (since 0.26)
@type try_helper: bool
@raise BadDigest: if the input directory doesn't match the given digest"""
if extract:
extracted = os.path.join(tmp, extract)
if not os.path.isdir(extracted):
raise Exception(_('Directory %s not found in archive') % extract)
else:
extracted = tmp
from . import manifest
manifest.fixup_permissions(extracted)
alg, required_value = manifest.splitID(required_digest)
actual_digest = alg.getID(manifest.add_manifest_file(extracted, alg))
if actual_digest != required_digest:
raise BadDigest(_('Incorrect manifest -- archive is corrupted.\n'
'Required digest: %(required_digest)s\n'
'Actual digest: %(actual_digest)s\n') %
{'required_digest': required_digest, 'actual_digest': actual_digest})
if try_helper:
if self._add_with_helper(required_digest, extracted):
support.ro_rmtree(tmp)
return
info(_("Can't add to system store. Trying user store instead."))
final_name = os.path.join(self.dir, required_digest)
if os.path.isdir(final_name):
raise Exception(_("Item %s already stored.") % final_name) # XXX: not really an error
# If we just want a subdirectory then the rename will change
# extracted/.. and so we'll need write permission on 'extracted'
os.chmod(extracted, 0o755)
os.rename(extracted, final_name)
os.chmod(final_name, 0o555)
if extract:
os.rmdir(tmp)
开发者ID:gvsurenderreddy,项目名称:zeroinstall,代码行数:45,代码来源:__init__.py
示例14: cook
def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
"""Follow a Recipe.
@param impl_hint: the Implementation this is for (if any) as a hint for the GUI
@see: L{download_impl} uses this method when appropriate"""
# Maybe we're taking this metaphor too far?
# Start a download for each ingredient
blockers = []
steps = []
try:
for stepdata in recipe.steps:
cls = StepRunner.class_for(stepdata)
step = cls(stepdata, impl_hint=impl_hint)
step.prepare(self, blockers)
steps.append(step)
while blockers:
yield blockers
tasks.check(blockers)
blockers = [b for b in blockers if not b.happened]
if self.external_store:
# Note: external_store will not yet work with non-<archive> steps.
streams = [step.stream for step in steps]
self._add_to_external_store(required_digest, recipe.steps, streams)
else:
# Create an empty directory for the new implementation
store = stores.stores[0]
tmpdir = store.get_tmp_dir_for(required_digest)
try:
# Unpack each of the downloaded archives into it in turn
for step in steps:
step.apply(tmpdir)
# Check that the result is correct and store it in the cache
store.check_manifest_and_rename(required_digest, tmpdir)
tmpdir = None
finally:
# If unpacking fails, remove the temporary directory
if tmpdir is not None:
support.ro_rmtree(tmpdir)
finally:
for step in steps:
step.close()
开发者ID:gvsurenderreddy,项目名称:0install,代码行数:44,代码来源:fetch.py
示例15: cook
def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
"""Follow a Recipe.
@param impl_hint: the Implementation this is for (if any) as a hint for the GUI
@see: L{download_impl} uses this method when appropriate"""
# Maybe we're taking this metaphor too far?
# Start downloading all the ingredients.
streams = {} # Streams collected from successful downloads
# Start a download for each ingredient
blockers = []
for step in recipe.steps:
blocker, stream = self.download_archive(step, force = force, impl_hint = impl_hint)
assert stream
blockers.append(blocker)
streams[step] = stream
while blockers:
yield blockers
tasks.check(blockers)
blockers = [b for b in blockers if not b.happened]
from zeroinstall.zerostore import unpack
# Create an empty directory for the new implementation
store = stores.stores[0]
tmpdir = store.get_tmp_dir_for(required_digest)
try:
# Unpack each of the downloaded archives into it in turn
for step in recipe.steps:
stream = streams[step]
stream.seek(0)
unpack.unpack_archive_over(step.url, stream, tmpdir,
extract = step.extract,
type = step.type,
start_offset = step.start_offset or 0)
# Check that the result is correct and store it in the cache
store.check_manifest_and_rename(required_digest, tmpdir)
tmpdir = None
finally:
# If unpacking fails, remove the temporary directory
if tmpdir is not None:
from zeroinstall import support
support.ro_rmtree(tmpdir)
开发者ID:dabrahams,项目名称:zeroinstall,代码行数:44,代码来源:fetch.py
示例16: process_archives
def process_archives(parent):
for elem in parent.childNodes:
if elem.namespaceURI != namespaces.XMLNS_IFACE:
continue
if elem.localName in ("archive", "file"):
# Download the archive if missing
href = elem.getAttribute("href")
assert href, "missing href on <archive>"
local_copy = os.path.join(template_dir, os.path.basename(href))
if not os.path.exists(local_copy):
print("Downloading {href} to {local_copy}".format(**locals()))
req = request.urlopen(href)
with open(local_copy + ".part", "wb") as local_stream:
shutil.copyfileobj(req, local_stream)
support.portable_rename(local_copy + ".part", local_copy)
req.close()
# Set the size attribute
elem.setAttribute("size", str(os.stat(local_copy).st_size))
if elem.localName == "archive":
if not elem.hasAttribute("extract"):
# Unpack (a rather inefficient way to guess the 'extract' attribute)
tmpdir = unpack.unpack_to_tmp(href, local_copy, elem.getAttribute("type"))
try:
unpack_dir = os.path.join(tmpdir, "unpacked")
# Set the extract attribute
extract = unpack.guess_extract(unpack_dir)
if extract:
elem.setAttribute("extract", extract)
unpack_dir = os.path.join(unpack_dir, extract)
assert os.path.isdir(unpack_dir), "Not a directory: {dir}".format(dir=unpack_dir)
finally:
support.ro_rmtree(tmpdir)
else:
extract = elem.getAttribute("extract")
if extract == "":
# Remove empty element
elem.removeAttribute("extract")
elif elem.localName == "recipe":
process_archives(elem)
开发者ID:0install,项目名称:0template,代码行数:44,代码来源:0template.py
示例17: handle
def handle(config, options, args):
"""@type args: [str]"""
if len(args) == 1:
extract = None
elif len(args) == 2:
extract = args[1]
else:
raise UsageError()
source = args[0]
alg = manifest.algorithms.get(options.algorithm or 'sha1new', None)
if alg is None:
raise SafeException(_('Unknown algorithm "%s"') % alg)
show_manifest = bool(options.manifest)
show_digest = bool(options.digest) or not show_manifest
def do_manifest(d):
if extract is not None:
d = os.path.join(d, extract)
digest = alg.new_digest()
for line in alg.generate_manifest(d):
if show_manifest:
print(line)
digest.update((line + '\n').encode('utf-8'))
if show_digest:
print(alg.getID(digest))
if os.path.isdir(source):
if extract is not None:
raise SafeException("Can't use extract with a directory")
do_manifest(source)
else:
data = None
tmpdir = tempfile.mkdtemp()
try:
data = open(args[0], 'rb')
unpack.unpack_archive(source, data, tmpdir, extract)
do_manifest(tmpdir)
finally:
support.ro_rmtree(tmpdir)
if data:
data.close()
开发者ID:AlexanderRyzhko,项目名称:0install-TUF,代码行数:43,代码来源:digest.py
示例18: add_archive_to_cache
def add_archive_to_cache(self, required_digest, data, url, extract = None, type = None, start_offset = 0, try_helper = False):
from . import unpack
if self.lookup(required_digest):
info(_("Not adding %s as it already exists!"), required_digest)
return
tmp = self.get_tmp_dir_for(required_digest)
try:
unpack.unpack_archive(url, data, tmp, extract, type = type, start_offset = start_offset)
except:
import shutil
shutil.rmtree(tmp)
raise
try:
self.check_manifest_and_rename(required_digest, tmp, extract, try_helper = try_helper)
except Exception:
#warn(_("Leaving extracted directory as %s"), tmp)
support.ro_rmtree(tmp)
raise
开发者ID:dabrahams,项目名称:zeroinstall,代码行数:21,代码来源:__init__.py
示例19: add_dir_to_cache
def add_dir_to_cache(self, required_digest, path, try_helper = False):
"""Copy the contents of path to the cache.
@param required_digest: the expected digest
@type required_digest: str
@param path: the root of the tree to copy
@type path: str
@param try_helper: attempt to use privileged helper before user cache (since 0.26)
@type try_helper: bool
@raise BadDigest: if the contents don't match the given digest."""
if self.lookup(required_digest):
info(_("Not adding %s as it already exists!"), required_digest)
return
tmp = self.get_tmp_dir_for(required_digest)
try:
_copytree2(path, tmp)
self.check_manifest_and_rename(required_digest, tmp, try_helper = try_helper)
except:
warn(_("Error importing directory."))
warn(_("Deleting %s"), tmp)
support.ro_rmtree(tmp)
raise
开发者ID:dabrahams,项目名称:zeroinstall,代码行数:22,代码来源:__init__.py
示例20: tearDown
def tearDown(self):
ro_rmtree(self.tmpdir)
开发者ID:0install,项目名称:0bootstrap,代码行数:2,代码来源:testbootstrap.py
注:本文中的zeroinstall.support.ro_rmtree函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论