• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python compat.translate_jobconf函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mrjob.compat.translate_jobconf函数的典型用法代码示例。如果您正苦于以下问题:Python translate_jobconf函数的具体用法?Python translate_jobconf怎么用?Python translate_jobconf使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了translate_jobconf函数的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_translate_jobconf

 def test_translate_jobconf(self):
     assert_equal(translate_jobconf('user.name', '0.18'),
                  'user.name')
     assert_equal(translate_jobconf('mapreduce.job.user.name', '0.18'),
                  'user.name')
     assert_equal(translate_jobconf('user.name', '0.19'),
                  'user.name')
     assert_equal(translate_jobconf('mapreduce.job.user.name', '0.19.2'),
                  'user.name')
     assert_equal(translate_jobconf('user.name', '0.21'),
                  'mapreduce.job.user.name')
开发者ID:gimlids,项目名称:LTPM,代码行数:11,代码来源:compat_test.py


示例2: _update_jobconf_for_hadoop_version

    def _update_jobconf_for_hadoop_version(self, jobconf, hadoop_version):
        """If *jobconf* (a dict) contains jobconf variables from the wrong
        version of Hadoop, add variables for the right one.

        If *hadoop_version* is empty, do nothing.
        """
        if not hadoop_version:  # this happens for sim runner
            return

        translations = {}  # for warning, below

        for key, value in sorted(jobconf.items()):
            new_key = translate_jobconf(key, hadoop_version)
            if new_key not in jobconf:
                jobconf[new_key] = value
                translations[key] = new_key

        if translations:
            log.warning(
                "Detected hadoop configuration property names that"
                " do not match hadoop version %s:"
                "\nThey have been translated as follows\n %s",
                hadoop_version,
                '\n'.join([
                    "%s: %s" % (key, new_key) for key, new_key
                    in sorted(translations.items())]))
开发者ID:parastoo-62,项目名称:mrjob,代码行数:26,代码来源:runner.py


示例3: _subprocess_env

    def _subprocess_env(self, step_type, step_num, task_num, input_file=None,
                        input_start=None, input_length=None):
        """Set up environment variables for a subprocess (mapper, etc.)

        This combines, in decreasing order of priority:

        * environment variables set by the **cmdenv** option
        * **jobconf** environment variables set by our job (e.g.
          ``mapreduce.task.ismap`)
        * environment variables from **jobconf** options, translated to
          whatever version of Hadoop we're emulating
        * the current environment
        * PYTHONPATH set to current working directory

        We use :py:func:`~mrjob.conf.combine_local_envs`, so ``PATH``
        environment variables are handled specially.
        """
        version = self.get_hadoop_version()

        jobconf_env = dict(
            (translate_jobconf(k, version).replace('.', '_'), str(v))
            for (k, v) in self._opts['jobconf'].iteritems())

        internal_jobconf = self._simulate_jobconf_for_step(
            step_type, step_num, task_num, input_file=input_file,
            input_start=input_start, input_length=input_length)

        internal_jobconf_env = dict(
            (translate_jobconf(k, version).replace('.', '_'), str(v))
            for (k, v) in internal_jobconf.iteritems())

        ironpython_env = {'IRONPYTHONPATH': os.getcwd()} if is_ironpython \
                         else {}

        # keep the current environment because we need PATH to find binaries
        # and make PYTHONPATH work
        return combine_local_envs({'PYTHONPATH': os.getcwd()},
                                  ironpython_env,
                                  os.environ,
                                  jobconf_env,
                                  internal_jobconf_env,
                                  self._get_cmdenv())
开发者ID:nyccto,项目名称:mrjob,代码行数:42,代码来源:local.py


示例4: _simulate_jobconf_for_step

    def _simulate_jobconf_for_step(
            self, task_type, step_num, task_num, map_split=None):
        j = {}

        # TODO: these are really poor imtations of Hadoop IDs. See #1254
        j['mapreduce.job.id'] = self._job_key
        j['mapreduce.task.id'] = 'task_%s_%s_%04d%d' % (
            self._job_key, task_type.lower(), step_num, task_num)
        j['mapreduce.task.attempt.id'] = 'attempt_%s_%s_%04d%d_0' % (
            self._job_key, task_type.lower(), step_num, task_num)

        j['mapreduce.task.ismap'] = str(task_type == 'mapper').lower()

        # TODO: is this the correct format?
        j['mapreduce.task.partition'] = str(task_num)

        j['mapreduce.task.output.dir'] = self._output_dir_for_step(step_num)

        working_dir = self._task_working_dir(task_type, step_num, task_num)
        j['mapreduce.job.local.dir'] = working_dir

        for x in ('archive', 'file'):
            named_paths = sorted(self._working_dir_mgr.name_to_path(x).items())

            # mapreduce.job.cache.archives
            # mapreduce.job.cache.files
            j['mapreduce.job.cache.%ss' % x] = ','.join(
                '%s#%s' % (path, name) for name, path in named_paths)

            # mapreduce.job.cache.local.archives
            # mapreduce.job.cache.local.files
            j['mapreduce.job.cache.local.%ss' % x] = ','.join(
                join(working_dir, name) for name, path in named_paths)

        if map_split:
            # mapreduce.map.input.file
            # mapreduce.map.input.start
            # mapreduce.map.input.length
            for key, value in map_split.items():
                j['mapreduce.map.input.' + key] = str(value)

        # translate to correct version

        # don't use translate_jobconf_dict(); that's meant to add keys
        # to user-supplied jobconf
        hadoop_version = self.get_hadoop_version()

        if hadoop_version:
            return {translate_jobconf(k, hadoop_version): v
                    for k, v in j.items()}
        else:
            return {tk: v for k, v in j.items()
                    for tk in translate_jobconf_for_all_versions(k)}
开发者ID:Affirm,项目名称:mrjob,代码行数:53,代码来源:sim.py


示例5: test_translate_jobconf

    def test_translate_jobconf(self):
        self.assertEqual(translate_jobconf("user.name", "0.20"), "user.name")
        self.assertEqual(translate_jobconf("mapreduce.job.user.name", "0.20"), "user.name")
        self.assertEqual(translate_jobconf("mapreduce.job.user.name", "0.20.2"), "user.name")
        self.assertEqual(translate_jobconf("user.name", "0.21"), "mapreduce.job.user.name")

        self.assertEqual(translate_jobconf("user.name", "1.0"), "user.name")
        self.assertEqual(translate_jobconf("user.name", "2.0"), "mapreduce.job.user.name")

        self.assertEqual(translate_jobconf("foo.bar", "2.0"), "foo.bar")
开发者ID:irskep,项目名称:mrjob,代码行数:10,代码来源:test_compat.py


示例6: _args_for_streaming_step

    def _args_for_streaming_step(self, step_num):
        hadoop_streaming_jar = self.get_hadoop_streaming_jar()
        if not hadoop_streaming_jar:
            raise Exception('no Hadoop streaming jar')

        mapper, combiner, reducer = (
            self._hadoop_streaming_commands(step_num))

        args = self.get_hadoop_bin() + ['jar', hadoop_streaming_jar]

        # set up uploading from HDFS to the working dir
        args.extend(
            self._upload_args(self._upload_mgr))

        # if no reducer, shut off reducer tasks. This has to come before
        # extra hadoop args, which could contain jar-specific args
        # (e.g. -outputformat). See #1331.
        #
        # might want to just integrate this into _hadoop_args_for_step?
        if not reducer:
            args.extend(['-D', ('%s=0' % translate_jobconf(
                'mapreduce.job.reduces', self.get_hadoop_version()))])

        # -libjars (#198)
        if self._opts['libjars']:
            args.extend(['-libjars', ','.join(self._opts['libjars'])])

        # Add extra hadoop args first as hadoop args could be a hadoop
        # specific argument (e.g. -libjars) which must come before job
        # specific args.
        args.extend(self._hadoop_args_for_step(step_num))

        # set up input
        for input_uri in self._hdfs_step_input_files(step_num):
            args.extend(['-input', input_uri])

        # set up output
        args.append('-output')
        args.append(self._hdfs_step_output_dir(step_num))

        args.append('-mapper')
        args.append(mapper)

        if combiner:
            args.append('-combiner')
            args.append(combiner)

        if reducer:
            args.append('-reducer')
            args.append(reducer)

        return args
开发者ID:kaiyik,项目名称:mrjob,代码行数:52,代码来源:hadoop.py


示例7: _hadoop_streaming_jar_args

    def _hadoop_streaming_jar_args(self, step_num):
        """The arguments that come after ``hadoop jar <streaming jar path>``
        when running a Hadoop streaming job."""
        args = []

        # get command for each part of the job
        mapper, combiner, reducer = (
            self._hadoop_streaming_commands(step_num))

        # set up uploading from HDFS/cloud storage to the working dir
        args.extend(self._upload_args())

        # if no reducer, shut off reducer tasks. This has to come before
        # extra hadoop args, which could contain jar-specific args
        # (e.g. -outputformat). See #1331.
        #
        # might want to just integrate this into _hadoop_args_for_step?
        if not reducer:
            args.extend(['-D', ('%s=0' % translate_jobconf(
                'mapreduce.job.reduces', self.get_hadoop_version()))])

        # Add extra hadoop args first as hadoop args could be a hadoop
        # specific argument which must come before job
        # specific args.
        args.extend(self._hadoop_args_for_step(step_num))

        # set up input
        for input_uri in self._step_input_uris(step_num):
            args.extend(['-input', input_uri])

        # set up output
        args.append('-output')
        args.append(self._step_output_uri(step_num))

        args.append('-mapper')
        args.append(mapper)

        if combiner:
            args.append('-combiner')
            args.append(combiner)

        if reducer:
            args.append('-reducer')
            args.append(reducer)

        return args
开发者ID:okomestudio,项目名称:mrjob,代码行数:46,代码来源:bin.py


示例8: test_translate_jobconf

    def test_translate_jobconf(self):
        self.assertEqual(translate_jobconf('user.name', '0.18'),
                         'user.name')
        self.assertEqual(translate_jobconf('mapreduce.job.user.name', '0.18'),
                         'user.name')
        self.assertEqual(translate_jobconf('user.name', '0.19'),
                         'user.name')
        self.assertEqual(
            translate_jobconf('mapreduce.job.user.name', '0.19.2'),
            'user.name')
        self.assertEqual(translate_jobconf('user.name', '0.21'),
                         'mapreduce.job.user.name')

        self.assertEqual(translate_jobconf('user.name', '1.0'),
                         'user.name')
        self.assertEqual(translate_jobconf('user.name', '2.0'),
                         'mapreduce.job.user.name')

        self.assertEqual(translate_jobconf('foo.bar', '2.0'), 'foo.bar')
开发者ID:kartheek6,项目名称:mrjob,代码行数:19,代码来源:test_compat.py


示例9: _args_for_streaming_step

    def _args_for_streaming_step(self, step_num):
        hadoop_streaming_jar = self.get_hadoop_streaming_jar()
        if not hadoop_streaming_jar:
            raise Exception('no Hadoop streaming jar')

        args = self.get_hadoop_bin() + ['jar', hadoop_streaming_jar]

        # set up uploading from HDFS to the working dir
        args.extend(
            self._upload_args(self._upload_mgr))

        # Add extra hadoop args first as hadoop args could be a hadoop
        # specific argument (e.g. -libjar) which must come before job
        # specific args.
        args.extend(self._hadoop_args_for_step(step_num))

        mapper, combiner, reducer = (
            self._hadoop_streaming_commands(step_num))

        # if no reducer, shut off reducer tasks
        if not reducer:
            args.extend(['-D', ('%s=0' % translate_jobconf(
                'mapreduce.job.reduces', self.get_hadoop_version()))])

        # set up input
        for input_uri in self._hdfs_step_input_files(step_num):
            args.extend(['-input', input_uri])

        # set up output
        args.append('-output')
        args.append(self._hdfs_step_output_dir(step_num))

        args.append('-mapper')
        args.append(mapper)

        if combiner:
            args.append('-combiner')
            args.append(combiner)

        if reducer:
            args.append('-reducer')
            args.append(reducer)

        return args
开发者ID:mmontagna,项目名称:mrjob,代码行数:44,代码来源:hadoop.py


示例10: _sort_values_jobconf

    def _sort_values_jobconf(self):
        """Jobconf dictionary to enable sorting by value.
        """
        if not self._sort_values:
            return {}

        # translate _SORT_VALUES_JOBCONF to the correct Hadoop version,
        # without logging a warning
        hadoop_version = self.get_hadoop_version()

        jobconf = {}
        for k, v in _SORT_VALUES_JOBCONF.items():
            if hadoop_version:
                jobconf[translate_jobconf(k, hadoop_version)] = v
            else:
                for j in translate_jobconf_for_all_versions(k):
                    jobconf[j] = v

        return jobconf
开发者ID:okomestudio,项目名称:mrjob,代码行数:19,代码来源:runner.py


示例11: _process_jobconf_args

    def _process_jobconf_args(self, jobconf):
        if jobconf:
            for (conf_arg, value) in jobconf.iteritems():
                # Internally, use one canonical Hadoop version
                canon_arg = translate_jobconf(conf_arg, '0.21')

                if canon_arg == 'mapreduce.job.maps':
                    self._map_tasks = int(value)
                    if self._map_tasks < 1:
                        raise ValueError(
                            '%s should be at least 1' % conf_arg)
                elif canon_arg == 'mapreduce.job.reduces':
                    self._reduce_tasks = int(value)
                    if self._reduce_tasks < 1:
                        raise ValueError('%s should be at least 1' % conf_arg)
                elif canon_arg == 'mapreduce.job.local.dir':
                    # Hadoop supports multiple direcories. Sticking with only
                    # one here
                    if not os.path.isdir(value):
                        raise IOError("Directory %s does not exist" % value)
                    self._working_dir = value
开发者ID:BrandonHaynes,项目名称:mrjob,代码行数:21,代码来源:local.py


示例12: _simulate_jobconf_for_step

    def _simulate_jobconf_for_step(
            self, step_num, step_type, task_num, working_dir,
            input_file=None, input_start=None, input_length=None):
        """Simulate jobconf variables set by Hadoop to indicate input
        files, files uploaded, working directory, etc. for a particular step.

        Returns a dictionary mapping jobconf variable name
        (e.g. ``'mapreduce.map.input.file'``) to its value, which is always
        a string.
        """
        # By convention, we use the newer (Hadoop 0.21+) jobconf names and
        # translate them at the very end.
        j = {}

        j['mapreduce.job.id'] = self._job_name
        j['mapreduce.task.output.dir'] = self._output_dir
        j['mapreduce.job.local.dir'] = working_dir
        # archives and files for jobconf
        cache_archives = []
        cache_files = []
        cache_local_archives = []
        cache_local_files = []

        files = self._working_dir_mgr.name_to_path('file').iteritems()
        for name, path in files:
            cache_files.append('%s#%s' % (path, name))
            cache_local_files.append(os.path.join(working_dir, name))

        archives = self._working_dir_mgr.name_to_path('archive').iteritems()
        for name, path in archives:
            cache_archives.append('%s#%s' % (path, name))
            cache_local_archives.append(os.path.join(working_dir, name))

        # TODO: could add mtime info here too (e.g.
        # mapreduce.job.cache.archives.timestamps) here too
        j['mapreduce.job.cache.files'] = (','.join(cache_files))
        j['mapreduce.job.cache.local.files'] = (','.join(cache_local_files))
        j['mapreduce.job.cache.archives'] = (','.join(cache_archives))
        j['mapreduce.job.cache.local.archives'] = (
            ','.join(cache_local_archives))

        # task and attempt IDs
        j['mapreduce.task.id'] = 'task_%s_%s_%05d%d' % (
            self._job_name, step_type.lower(), step_num, task_num)
        # (we only have one attempt)
        j['mapreduce.task.attempt.id'] = 'attempt_%s_%s_%05d%d_0' % (
            self._job_name, step_type.lower(), step_num, task_num)

        # not actually sure what's correct for combiners here. It'll definitely
        # be true if we're just using pipes to simulate a combiner though
        j['mapreduce.task.ismap'] = str(
            step_type in ('mapper', 'combiner')).lower()

        j['mapreduce.task.partition'] = str(task_num)

        if input_file is not None:
            j['mapreduce.map.input.file'] = input_file
        if input_start is not None:
            j['mapreduce.map.input.start'] = str(input_start)
        if input_length is not None:
            j['mapreduce.map.input.length'] = str(input_length)

        # translate to correct version
        version = self.get_hadoop_version()
        j = dict((translate_jobconf(k, version), v) for k, v in j.iteritems())

        return j
开发者ID:PythonCharmers,项目名称:mrjob,代码行数:67,代码来源:sim.py



注:本文中的mrjob.compat.translate_jobconf函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python compat.uses_yarn函数代码示例发布时间:2022-05-27
下一篇:
Python mrflog.warn函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap