• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python osutils.pjoin函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中snakeoil.osutils.pjoin函数的典型用法代码示例。如果您正苦于以下问题:Python pjoin函数的具体用法?Python pjoin怎么用?Python pjoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了pjoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_parents

 def test_parents(self):
     path = pjoin(self.dir, self.profile)
     os.mkdir(pjoin(path, "child"))
     self.write_file("parent", "..", profile="%s/child" % self.profile)
     p = self.klass(pjoin(path, "child"))
     self.assertEqual(1, len(p.parents))
     self.assertEqual(p.parents[0].path, path)
开发者ID:veelai,项目名称:pkgcore,代码行数:7,代码来源:test_profiles.py


示例2: test_packages

    def test_packages(self):
        p = self.klass(pjoin(self.dir, self.profile))
        self.assertEqual(p.system, empty)
        self.assertEqual(p.visibility, empty)
        self.parsing_checks("packages", "system")
        self.write_file("packages", "#foo\n")
        p = self.klass(pjoin(self.dir, self.profile))
        self.assertEqual(p.visibility, empty)
        self.assertEqual(p.system, empty)
        self.write_file("packages", "#foo\ndev-util/diffball\n")
        p = self.klass(pjoin(self.dir, self.profile))
        self.assertEqual(p.system, empty)
        self.assertEqual(list(p.visibility), [(), (atom("dev-util/diffball", negate_vers=True),)])

        self.write_file("packages", "-dev-util/diffball\ndev-foo/bar\n*dev-sys/atom\n" "-*dev-sys/atom2\nlock-foo/dar")
        p = self.klass(pjoin(self.dir, self.profile))
        self.assertEqual(p.system, ((atom("dev-sys/atom2"),), (atom("dev-sys/atom"),)))
        self.assertEqual(
            [set(x) for x in p.visibility],
            [
                set([atom("dev-util/diffball", negate_vers=True)]),
                set([atom("dev-foo/bar", negate_vers=True), atom("lock-foo/dar", negate_vers=True)]),
            ],
        )
        self.simple_eapi_awareness_check("packages", "system")
开发者ID:veelai,项目名称:pkgcore,代码行数:25,代码来源:test_profiles.py


示例3: parsing_checks

 def parsing_checks(self, filename, attr, data="", line_negation=True):
     path = pjoin(self.dir, self.profile)
     self.write_file(filename, data)
     getattr(self.klass(path), attr)
     self.write_file(filename, "-")
     self.assertRaises(profiles.ProfileError, getattr, self.klass(path), attr)
     self.wipe_path(pjoin(path, filename))
开发者ID:veelai,项目名称:pkgcore,代码行数:7,代码来源:test_profiles.py


示例4: fetch_one

 def fetch_one(self, fetchable, observer, retry=False):
     if fetchable.filename in self._basenames:
         return True
     # fetching files without uri won't fly
     # XXX hack atm, could use better logic but works for now
     try:
         fp = self.fetcher(fetchable)
     except fetch_errors.ChksumFailure as e:
         # checksum failed, rename file and try refetching
         path = pjoin(self.fetcher.distdir, fetchable.filename)
         failed_filename = f'{fetchable.filename}._failed_chksum_'
         failed_path = pjoin(self.fetcher.distdir, failed_filename)
         os.rename(path, failed_path)
         if retry:
             raise
         observer.error(str(e))
         observer.error(f'renaming to {failed_filename!r} and refetching from upstream')
         observer.flush()
         # refetch directly from upstream
         return self.fetch_one(fetchable.upstream, observer, retry=True)
     except fetch_errors.FetchFailed as e:
         fp = None
     if fp is None:
         return False
     self.verified_files[fp] = fetchable
     self._basenames.add(fetchable.filename)
     return True
开发者ID:radhermit,项目名称:pkgcore,代码行数:27,代码来源:format.py


示例5: test_from_abspath

 def test_from_abspath(self):
     self.mk_profiles({"name": "profiles"}, {"name": "profiles/1"})
     base = pjoin(self.dir, "profiles")
     p = self.kls.from_abspath(pjoin(base, "1"))
     self.assertNotEqual(p, None)
     self.assertEqual(normpath(p.basepath), normpath(base))
     self.assertEqual(normpath(p.profile), normpath(pjoin(base, "1")))
开发者ID:veelai,项目名称:pkgcore,代码行数:7,代码来源:test_profiles.py


示例6: run_check

 def run_check(*args):
     # create a fresh tree for the profile work everytime.
     # do this, so that it's always a unique pathway- this sidesteps
     # any potential issues of ProfileNode instance caching.
     path = pjoin(self.dir, 'foo', str(counter.next()))
     shutil.copytree(pjoin(self.dir, 'foo'), path, symlinks=True)
     return self.process_check(path, list(args))
开发者ID:den4ix,项目名称:pkgcheck,代码行数:7,代码来源:test_addons.py


示例7: config_from_make_conf

def config_from_make_conf(location="/etc/"):
    """
    generate a config from a file location

    :param location: location the portage configuration is based in,
        defaults to /etc
    """

    # this actually differs from portage parsing- we allow
    # make.globals to provide vars used in make.conf, portage keeps
    # them seperate (kind of annoying)

    config_root = os.environ.get("PORTAGE_CONFIGROOT", "/")
    base_path = pjoin(config_root, location.strip("/"))
    portage_base = pjoin(base_path, "portage")

    # this isn't preserving incremental behaviour for features/use
    # unfortunately

    conf_dict = {}
    try:
        load_make_config(conf_dict, pjoin(base_path, 'make.globals'))
    except errors.ParsingError, e:
        if not getattr(getattr(e, 'exc', None), 'errno', None) == errno.ENOENT:
           raise
        try:
            load_make_config(conf_dict,
                pjoin(config_root, 'usr/share/portage/config/make.globals'))
        except compatibility.IGNORED_EXCEPTIONS:
            raise
        except:
            compatibility.raise_from(errors.ParsingError(
                "failed to find a usable make.globals"))
开发者ID:veelai,项目名称:pkgcore,代码行数:33,代码来源:portage_conf.py


示例8: trigger

    def trigger(self, engine, existing_cset, install_cset):
        # hackish, but it works.
        protected_filter = gen_config_protect_filter(
            engine.offset, self.extra_protects, self.extra_disables).match
        ignore_filter = gen_collision_ignore_filter(engine.offset).match
        protected = {}

        for x in existing_cset.iterfiles():
            if not ignore_filter(x.location) and protected_filter(x.location):
                replacement = install_cset[x]
                if not simple_chksum_compare(replacement, x):
                    protected.setdefault(
                        pjoin(engine.offset,
                              os.path.dirname(x.location).lstrip(os.path.sep)),
                        []).append((os.path.basename(replacement.location),
                                    replacement))

        for dir_loc, entries in protected.iteritems():
            updates = {x[0]: [] for x in entries}
            try:
                existing = sorted(x for x in listdir_files(dir_loc)
                                  if x.startswith("._cfg"))
            except OSError as oe:
                if oe.errno != errno.ENOENT:
                    raise
                # this shouldn't occur.
                continue

            for x in existing:
                try:
                    # ._cfg0000_filename
                    count = int(x[5:9])
                    if x[9] != "_":
                        raise ValueError
                    fn = x[10:]
                except (ValueError, IndexError):
                    continue
                if fn in updates:
                    updates[fn].append((count, fn))

            # now we rename.
            for fname, entry in entries:
                # check for any updates with the same chksums.
                count = 0
                for cfg_count, cfg_fname in updates[fname]:
                    if simple_chksum_compare(livefs.gen_obj(
                            pjoin(dir_loc, cfg_fname)), entry):
                        count = cfg_count
                        break
                    count = max(count, cfg_count + 1)
                try:
                    install_cset.remove(entry)
                except KeyError:
                    # this shouldn't occur...
                    continue
                new_fn = pjoin(dir_loc, "._cfg%04i_%s" % (count, fname))
                new_entry = entry.change_attributes(location=new_fn)
                install_cset.add(new_entry)
                self.renames[new_entry] = entry
            del updates
开发者ID:vapier,项目名称:pkgcore,代码行数:60,代码来源:triggers.py


示例9: _visibility_limiters

 def _visibility_limiters(self):
     path = pjoin(self.base, 'profiles', 'package.mask')
     pos, neg = [], []
     try:
         if (self.config.eapi.options['has_profile_data_dirs'] or
                 self.config.profile_formats.intersection(['portage-1', 'portage-2'])):
             paths = sorted_scan(path)
         else:
             paths = [path]
         for path in paths:
             for line in iter_read_bash(path):
                 line = line.strip()
                 if line in ('-', ''):
                     raise profiles.ProfileError(
                         pjoin(self.base, 'profiles'),
                         'package.mask', "encountered empty negation: -")
                 if line.startswith('-'):
                     neg.append(atom.atom(line[1:]))
                 else:
                     pos.append(atom.atom(line))
     except FileNotFoundError:
         pass
     except ebuild_errors.MalformedAtom as e:
         raise profiles.ProfileError(
             pjoin(self.base, 'profiles'), 'package.mask', e) from e
     return tuple(neg), tuple(pos)
开发者ID:radhermit,项目名称:pkgcore,代码行数:26,代码来源:repository.py


示例10: _add_sets

    def _add_sets(self):
        self["world"] = basics.AutoConfigSection({
            "class": "pkgcore.pkgsets.filelist.WorldFile",
            "location": pjoin(self.root, econst.WORLD_FILE.lstrip('/'))})
        self["system"] = basics.AutoConfigSection({
            "class": "pkgcore.pkgsets.system.SystemSet",
            "profile": "profile"})
        self["installed"] = basics.AutoConfigSection({
            "class": "pkgcore.pkgsets.installed.Installed",
            "vdb": "vdb"})
        self["versioned-installed"] = basics.AutoConfigSection({
            "class": "pkgcore.pkgsets.installed.VersionedInstalled",
            "vdb": "vdb"})

        set_fp = pjoin(self.dir, "sets")
        try:
            for setname in listdir_files(set_fp):
                # Potential for name clashes here, those will just make
                # the set not show up in config.
                if setname in ("system", "world"):
                    logger.warning(
                        "user defined set %r is disallowed; ignoring",
                        pjoin(set_fp, setname))
                    continue
                self[setname] = basics.AutoConfigSection({
                    "class": "pkgcore.pkgsets.filelist.FileList",
                    "location": pjoin(set_fp, setname)})
        except FileNotFoundError:
            pass
开发者ID:radhermit,项目名称:pkgcore,代码行数:29,代码来源:portage_conf.py


示例11: _internal_offset_iter_scan

def _internal_offset_iter_scan(path, chksum_handlers, offset, stat_func=os.lstat,
                               hidden=True, backup=True):
    offset = normpath(offset)
    path = normpath(path)
    dirs = collections.deque([path[len(offset):]])
    if dirs[0]:
        yield gen_obj(dirs[0], chksum_handlers=chksum_handlers,
            stat_func=stat_func)

    sep = os.path.sep
    while dirs:
        base = dirs.popleft()
        real_base = pjoin(offset, base.lstrip(sep))
        base = base.rstrip(sep) + sep
        for x in listdir(real_base):
            if not hidden and x.startswith('.'):
                continue
            if not backup and x.endswith('~'):
                continue
            path = pjoin(base, x)
            obj = gen_obj(path, chksum_handlers=chksum_handlers,
                        real_location=pjoin(real_base, x),
                        stat_func=os.lstat)
            yield obj
            if obj.is_dir:
                dirs.append(path)
开发者ID:radhermit,项目名称:pkgcore,代码行数:26,代码来源:livefs.py


示例12: _caching_grab_virtuals

def _caching_grab_virtuals(repo, cache_basedir):
    virtuals = {}
    update = False
    cache = _read_mtime_cache(pjoin(cache_basedir, 'virtuals.cache'))

    existing = _get_mtimes(repo.location)
    for cat, mtime in existing.iteritems():
        d = cache.pop(cat, None)
        if d is not None and long(d[0]) == long(mtime):
            d = _convert_cached_virtuals(d)
            if d is not None:
                _merge_virtuals(virtuals, d)
                continue

        update = True
        _collect_virtuals(virtuals, repo.itermatch(
            packages.PackageRestriction("category",
                values.StrExactMatch(cat))))

    if update or cache:
        _write_mtime_cache(existing, virtuals,
            pjoin(cache_basedir, 'virtuals.cache'))

    defaults = _collect_default_providers(virtuals)
#    _finalize_virtuals(virtuals)
    return defaults, virtuals
开发者ID:vapier,项目名称:pkgcore,代码行数:26,代码来源:virtuals.py


示例13: _clean_old_caches

def _clean_old_caches(path):
    for name in ('plugincache2',):
        try:
            osutils.unlink_if_exists(pjoin(path, name))
        except EnvironmentError, e:
            logger.error("attempting to clean old plugin cache %r failed with %s",
                pjoin(path, name), e)
开发者ID:veelai,项目名称:pkgcore,代码行数:7,代码来源:plugin.py


示例14: process_subcommands

    def process_subcommands(self, parser, name, action_group):
        l = []
        h = self._get_formatter(parser, name)
        h.add_arguments(action_group._group_actions)
        data = h.format_help().strip()
        if data:
            assert len(action_group._group_actions) == 1
            l.extend(_rst_header("=", action_group.title))
            if action_group.description:
                l.extend(action_group.description.split("\n"))

            for subcommand, parser in action_group._group_actions[0].choices.iteritems():
                subdir_path = self.name.split()[1:]
                base = pjoin(self.base_path, *subdir_path)
                self.__class__(base, "%s %s" % (
                    self.name, subcommand), parser, mtime=self.mtime, out_name=subcommand).run()

                toc_path = self.name.split()
                if subdir_path:
                    toc_path = subdir_path

            l.append('')
            l.append(".. toctree::")
            l.append("    :maxdepth: 2")
            l.append('')
            l.extend("    %s %s <%s>" %
                     (name, subcommand, pjoin(*list(toc_path + [subcommand])))
                     for subcommand in action_group._group_actions[0].choices)
            l.append('')
        return l
开发者ID:vapier,项目名称:pkgcore,代码行数:30,代码来源:generate_man_rsts.py


示例15: _split

    def _split(self, iterable, observer, engine, cset):
        debug_store = pjoin(engine.offset, self._debug_storage.lstrip('/'))

        objcopy_args = [self.objcopy_binary, '--only-keep-debug']
        if self._compress:
            objcopy_args.append('--compress-debug-sections')

        for fs_objs, ftype in iterable:
            if 'ar archive' in ftype:
                continue
            if 'relocatable' in ftype:
                if not any(x.basename.endswith(".ko") for x in fs_objs):
                    continue
            fs_obj = fs_objs[0]
            debug_loc = pjoin(debug_store, fs_obj.location.lstrip('/') + ".debug")
            if debug_loc in cset:
                continue
            fpath = fs_obj.data.path
            debug_ondisk = pjoin(os.path.dirname(fpath),
                os.path.basename(fpath) + ".debug")

            # note that we tell the UI the final pathway- not the intermediate one.
            observer.info("splitdebug'ing %s into %s" %
                (fs_obj.location, debug_loc))

            ret = spawn.spawn(objcopy_args + [fpath, debug_ondisk])
            if ret != 0:
                observer.warn("splitdebug'ing %s failed w/ exitcode %s" %
                    (fs_obj.location, ret))
                continue

            # note that the given pathway to the debug file /must/ be relative to ${D};
            # it must exist at the time of invocation.
            ret = spawn.spawn([self.objcopy_binary,
                '--add-gnu-debuglink', debug_ondisk, fpath])
            if ret != 0:
                observer.warn("splitdebug created debug file %r, but "
                    "failed adding links to %r (%r)" % (debug_ondisk, fpath, ret))
                observer.debug("failed splitdebug command was %r",
                    (self.objcopy_binary, '--add-gnu-debuglink', debug_ondisk, fpath))
                continue


            debug_obj = gen_obj(debug_loc, real_location=debug_ondisk,
                uid=os_data.root_uid, gid=os_data.root_gid)

            stripped_fsobj = self._strip_fsobj(fs_obj, ftype, observer, quiet=True)

            self._modified.add(stripped_fsobj)
            self._modified.add(debug_obj)

            for fs_obj in fs_objs[1:]:
                debug_loc = pjoin(debug_store,
                    fs_obj.location.lstrip('/') + ".debug")
                linked_debug_obj = debug_obj.change_attributes(location=debug_loc)
                observer.info("splitdebug hardlinking %s to %s" %
                    (debug_obj.location, debug_loc))
                self._modified.add(linked_debug_obj)
                self._modified.add(stripped_fsobj.change_attributes(
                    location=fs_obj.location))
开发者ID:veelai,项目名称:pkgcore,代码行数:60,代码来源:triggers.py


示例16: test_licenses

 def test_licenses(self):
     licenses = ('GPL-2', 'GPL-3+', 'BSD')
     ensure_dirs(pjoin(self.dir, 'licenses'))
     for license in licenses:
         touch(pjoin(self.dir, 'licenses', license))
     repo = self.mk_tree(self.dir)
     self.assertEqual(sorted(repo.licenses), sorted(licenses))
开发者ID:radhermit,项目名称:pkgcore,代码行数:7,代码来源:test_repository.py


示例17: trigger

    def trigger(self, engine):
        bin_path = self.get_binary_path()
        if bin_path is None:
            return

        offset = engine.offset

        locs = [pjoin(offset, x.lstrip(os.path.sep)) for x in self.locations]

        if engine.phase.startswith('pre_'):
            self.saved_mtimes.set_state(locs)
            return
        elif engine.phase == 'post_merge' and \
            engine.mode == const.REPLACE_MODE:
            # skip post_merge for replace.
            # we catch it on unmerge...
            return

        regens = set(x.location for x in self.saved_mtimes.get_changes(locs))
        # force regeneration of any directory lacking the info index.
        regens.update(x for x in locs if not os.path.isfile(pjoin(x, 'dir')))

        bad = []
        for x in regens:
            bad.extend(self.regen(bin_path, x))

        if bad and engine.observer is not None:
            engine.observer.warn("bad info files: %r" % sorted(bad))
开发者ID:chutz,项目名称:pkgcore,代码行数:28,代码来源:triggers.py


示例18: regen

    def regen(self, binary, basepath):
        ignores = ("dir", "dir.old")
        try:
            files = listdir_files(basepath)
        except OSError as oe:
            if oe.errno == errno.ENOENT:
                return
            raise

        if self.should_skip_directory(basepath, files):
            return

        # wipe old indexes.
        for x in set(ignores).intersection(files):
            os.remove(pjoin(basepath, x))

        index = pjoin(basepath, 'dir')
        for x in files:
            if x in ignores or x.startswith("."):
                continue

            ret, data = spawn.spawn_get_output(
                [binary, '--quiet', pjoin(basepath, x), '--dir-file', index],
                collect_fds=(1,2), split_lines=False)

            if not data or "already exists" in data or \
                    "warning: no info dir entry" in data:
                continue
            yield pjoin(basepath, x)
开发者ID:chutz,项目名称:pkgcore,代码行数:29,代码来源:triggers.py


示例19: test_load_repos_conf_dir

    def test_load_repos_conf_dir(self):
        # repo priority sorting and dir/symlink scanning

        repos_conf_dir = pjoin(self.dir, 'repos.conf')
        os.mkdir(repos_conf_dir)
        repos_conf_sym = pjoin(self.dir, 'repos.conf.sym')
        os.symlink(repos_conf_dir, repos_conf_sym)

        # add global repos.conf
        shutil.copyfile(
            pjoin(const.CONFIG_PATH, 'repos.conf'),
            pjoin(repos_conf_dir, 'repos.conf'))

        with open(pjoin(repos_conf_dir, 'z'), 'w') as f:
            f.write(textwrap.dedent('''\
                [bar]
                location = /var/gentoo/repos/bar

                [foo]
                location = /var/gentoo/repos/foo
                priority = 10'''))
            f.flush()

        defaults, repos = load_repos_conf(repos_conf_dir)
        sym_defaults, sym_repos = load_repos_conf(repos_conf_sym)

        self.assertEqual(defaults, sym_defaults)
        self.assertEqual(repos, sym_repos)
        self.assertEqual('gentoo', defaults['main-repo'])
        self.assertEqual(['foo', 'bar', 'gentoo'], repos.keys())
开发者ID:den4ix,项目名称:pkgcore,代码行数:30,代码来源:test_portage_conf.py


示例20: test_load_make_conf

    def test_load_make_conf(self):
        self.assertIn('PORTAGE_TMPDIR', self.make_globals)

        # nonexistent file
        d = {}
        # by default files are required
        self.assertRaises(
            errors.ParsingError, load_make_conf,
            d, pjoin(self.dir, 'make.globals'))
        # should return empty dict when not required
        load_make_conf(d, pjoin(self.dir, 'make.conf'), required=False)
        self.assertEqual({}, d)

        # unreadable file
        d = {}
        with NamedTemporaryFile() as f:
            os.chmod(f.name, stat.S_IWUSR)
            self.assertRaises(
                errors.PermissionDeniedError, load_make_conf, d, f.name)

        # overrides and incrementals
        with NamedTemporaryFile() as f:
            f.write(b'DISTDIR=foo\nACCEPT_LICENSE=foo\n')
            f.flush()
            d = {}
            load_make_conf(d, pjoin(const.CONFIG_PATH, 'make.globals'))
            load_make_conf(d, f.name, allow_sourcing=True, incrementals=True)
            self.assertEqual('foo', d['DISTDIR'])
            self.assertEqual(
                ' '.join([self.make_globals['ACCEPT_LICENSE'], 'foo']),
                d['ACCEPT_LICENSE'])
开发者ID:den4ix,项目名称:pkgcore,代码行数:31,代码来源:test_portage_conf.py



注:本文中的snakeoil.osutils.pjoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python common.check_error函数代码示例发布时间:2022-05-27
下一篇:
Python osutils.normpath函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap