本文整理汇总了Python中sahara.service.edp.oozie.workflow_creator.workflow_factory.get_workflow_xml函数的典型用法代码示例。如果您正苦于以下问题:Python get_workflow_xml函数的具体用法?Python get_workflow_xml怎么用?Python get_workflow_xml使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_workflow_xml函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_build_workflow_swift_configs
def test_build_workflow_swift_configs(self, job_binary):
# Test that swift configs come from either input or output data sources
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, configs={})
job_binary.return_value = {"name": "script.pig"}
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('hdfs://user/hadoop/out')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>""", res)
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>""", res)
job, job_exec = u.create_job_exec(
edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}})
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('hdfs://user/hadoop/out')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
<property>
<name>dummy</name>
<value>value</value>
</property>
</configuration>""", res)
开发者ID:degorenko,项目名称:sahara,代码行数:60,代码来源:test_job_manager.py
示例2: test_build_workflow_for_job_pig
def test_build_workflow_for_job_pig(self, job_binary):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, configs={})
job_binary.return_value = {"name": "script.pig"}
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<param>INPUT=swift://ex.sahara/i</param>
<param>OUTPUT=swift://ex.sahara/o</param>""", res)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>""", res)
self.assertIn("<script>script.pig</script>", res)
# testing workflow creation with a proxy domain
self.override_config('use_domain_for_proxy_users', True)
self.override_config("proxy_user_domain_name", 'sahara_proxy_domain')
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, proxy=True)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.domain.name</name>
<value>sahara_proxy_domain</value>
</property>
<property>
<name>fs.swift.service.sahara.password</name>
<value>55555555-6666-7777-8888-999999999999</value>
</property>
<property>
<name>fs.swift.service.sahara.trust.id</name>
<value>0123456789abcdef0123456789abcdef</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>job_00000000-1111-2222-3333-4444444444444444</value>
</property>
</configuration>""", res)
开发者ID:rogeryu27,项目名称:sahara,代码行数:60,代码来源:test_job_manager.py
示例3: test_build_workflow_for_job_hive
def test_build_workflow_for_job_hive(self, job_binary):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, configs={})
job_binary.return_value = {"name": "script.q"}
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
self.assertEqual('/user/hadoop/conf/hive-site.xml',
xmlutils.get_text_from_node(hive, 'job-xml'))
configuration = hive.getElementsByTagName('configuration')
properties = xmlutils.get_property_dict(configuration[0])
self.assertEqual({'fs.swift.service.sahara.password': 'admin1',
'fs.swift.service.sahara.username': 'admin'},
properties)
self.assertEqual('script.q',
xmlutils.get_text_from_node(hive, 'script'))
params = xmlutils.get_param_dict(hive)
self.assertEqual({'INPUT': 'swift://ex.sahara/i',
'OUTPUT': 'swift://ex.sahara/o'}, params)
# testing workflow creation with a proxy domain
self.override_config('use_domain_for_proxy_users', True)
self.override_config("proxy_user_domain_name", 'sahara_proxy_domain')
job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, proxy=True)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
configuration = hive.getElementsByTagName('configuration')
properties = xmlutils.get_property_dict(configuration[0])
self.assertEqual({
'fs.swift.service.sahara.domain.name':
'sahara_proxy_domain',
'fs.swift.service.sahara.trust.id':
'0123456789abcdef0123456789abcdef',
'fs.swift.service.sahara.password':
'55555555-6666-7777-8888-999999999999',
'fs.swift.service.sahara.username':
'job_00000000-1111-2222-3333-4444444444444444'}, properties)
开发者ID:rogeryu27,项目名称:sahara,代码行数:58,代码来源:test_job_manager.py
示例4: test_build_workflow_for_job_hive
def test_build_workflow_for_job_hive(self, job_binary):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, configs={})
job_binary.return_value = {"name": "script.q"}
input_data = u.create_data_source("swift://ex/i")
output_data = u.create_data_source("swift://ex/o")
data_source_urls = {input_data.id: input_data.url, output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs, input_data, output_data, "hadoop", data_source_urls
)
doc = xml.parseString(res)
hive = doc.getElementsByTagName("hive")[0]
self.assertEqual("/user/hadoop/conf/hive-site.xml", xmlutils.get_text_from_node(hive, "job-xml"))
configuration = hive.getElementsByTagName("configuration")
properties = xmlutils.get_property_dict(configuration[0])
self.assertEqual(
{"fs.swift.service.sahara.password": "admin1", "fs.swift.service.sahara.username": "admin"}, properties
)
self.assertEqual("script.q", xmlutils.get_text_from_node(hive, "script"))
params = xmlutils.get_param_dict(hive)
self.assertEqual({"INPUT": "swift://ex.sahara/i", "OUTPUT": "swift://ex.sahara/o"}, params)
# testing workflow creation with a proxy domain
self.override_config("use_domain_for_proxy_users", True)
self.override_config("proxy_user_domain_name", "sahara_proxy_domain")
job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, proxy=True)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs, input_data, output_data, "hadoop", data_source_urls
)
doc = xml.parseString(res)
hive = doc.getElementsByTagName("hive")[0]
configuration = hive.getElementsByTagName("configuration")
properties = xmlutils.get_property_dict(configuration[0])
self.assertEqual(
{
"fs.swift.service.sahara.domain.name": "sahara_proxy_domain",
"fs.swift.service.sahara.trust.id": "0123456789abcdef0123456789abcdef",
"fs.swift.service.sahara.password": "55555555-6666-7777-8888-999999999999",
"fs.swift.service.sahara.username": "job_00000000-1111-2222-3333-4444444444444444",
},
properties,
)
开发者ID:thefuyang,项目名称:sahara,代码行数:51,代码来源:test_job_manager.py
示例5: _build_workflow_with_conf_common
def _build_workflow_with_conf_common(self, job_type):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
job, job_exec = u.create_job_exec(job_type,
configs={"configs": {'c': 'f'}})
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<property>
<name>c</name>
<value>f</value>
</property>""", res)
self.assertIn("""
<property>
<name>mapred.input.dir</name>
<value>swift://ex.sahara/i</value>
</property>""", res)
self.assertIn("""
<property>
<name>mapred.output.dir</name>
<value>swift://ex.sahara/o</value>
</property>""", res)
开发者ID:stannie42,项目名称:sahara,代码行数:29,代码来源:test_job_manager.py
示例6: test_build_workflow_for_job_pig
def test_build_workflow_for_job_pig(self, job_binary):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_binary.return_value = {"name": "script.pig"}
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
self.assertIn("""
<param>INPUT=swift://ex.sahara/i</param>
<param>OUTPUT=swift://ex.sahara/o</param>""", res)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>""", res)
self.assertIn("<script>script.pig</script>", res)
开发者ID:COSHPC,项目名称:sahara,代码行数:28,代码来源:test_job_manager.py
示例7: test_build_workflow_for_job_hive
def test_build_workflow_for_job_hive(self, job_binary):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE)
job_binary.return_value = {"name": "script.q"}
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<job-xml>/user/hadoop/conf/hive-site.xml</job-xml>
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>
<script>script.q</script>
<param>INPUT=swift://ex.sahara/i</param>
<param>OUTPUT=swift://ex.sahara/o</param>""", res)
开发者ID:stannie42,项目名称:sahara,代码行数:27,代码来源:test_job_manager.py
示例8: test_build_workflow_for_job_java
def test_build_workflow_for_job_java(self):
# If args include swift paths, user and password values
# will have to be supplied via configs instead of being
# lifted from input or output data sources
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
'args': ['swift://ex/i',
'output_path']
}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>
<main-class>%s</main-class>
<java-opts>%s</java-opts>
<arg>swift://ex.sahara/i</arg>
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
开发者ID:stannie42,项目名称:sahara,代码行数:32,代码来源:test_job_manager.py
示例9: test_build_workflow_for_job_java_with_adapter
def test_build_workflow_for_job_java_with_adapter(self, edp_conf_mock):
edp_conf_mock.return_value = True
configs = {"configs": {"edp.java.main_class": "some_main"}}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
res = workflow_factory.get_workflow_xml(job, u.create_cluster(), job_exec.job_configs)
self.assertIn("<main-class>org.openstack.sahara.edp.MainWrapper</main-class>", res)
self.assertNotIn("some_main", res)
开发者ID:thefuyang,项目名称:sahara,代码行数:9,代码来源:test_job_manager.py
示例10: test_build_workflow_for_job_shell
def test_build_workflow_for_job_shell(self):
configs = {"configs": {"k1": "v1"}, "params": {"p1": "v1"}, "args": ["a1", "a2"]}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_SHELL, configs)
res = workflow_factory.get_workflow_xml(job, u.create_cluster(), job_exec.job_configs)
self.assertIn("<name>k1</name>", res)
self.assertIn("<value>v1</value>", res)
self.assertIn("<env-var>p1=v1</env-var>", res)
self.assertIn("<argument>a1</argument>", res)
self.assertIn("<argument>a2</argument>", res)
开发者ID:thefuyang,项目名称:sahara,代码行数:12,代码来源:test_job_manager.py
示例11: _build_workflow_common
def _build_workflow_common(self, job_type, streaming=False):
if streaming:
configs = {'edp.streaming.mapper': '/usr/bin/cat',
'edp.streaming.reducer': '/usr/bin/wc'}
configs = {'configs': configs}
else:
configs = {}
job, job_exec = u.create_job_exec(job_type, configs)
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
if streaming:
self.assertIn("""
<streaming>
<mapper>/usr/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>""", res)
self.assertIn("""
<property>
<name>mapred.output.dir</name>
<value>swift://ex.sahara/o</value>
</property>""", res)
self.assertIn("""
<property>
<name>mapred.input.dir</name>
<value>swift://ex.sahara/i</value>
</property>""", res)
self.assertIn("""
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>""", res)
self.assertIn("""
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>""", res)
开发者ID:stannie42,项目名称:sahara,代码行数:47,代码来源:test_job_manager.py
示例12: run_job
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution,
job)
proxy_configs = job_execution.job_configs.get('proxy_configs')
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job,
proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source,
hdfs_user)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:a9261,项目名称:sahara,代码行数:46,代码来源:engine.py
示例13: run_job
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution,
job)
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
hdfs_user = self.plugin.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the plugin api now.
# However, other engines may need it.
oozie_server = self.plugin.get_oozie_server(self.cluster)
wf_dir = job_utils.create_hdfs_workflow_dir(oozie_server,
job, hdfs_user)
job_utils.upload_job_files_to_hdfs(oozie_server, wf_dir,
job, hdfs_user)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:COSHPC,项目名称:sahara,代码行数:43,代码来源:engine.py
示例14: test_build_workflow_for_job_java
def test_build_workflow_for_job_java(self):
# If args include swift paths, user and password values
# will have to be supplied via configs instead of being
# lifted from input or output data sources
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
'args': ['swift://ex/i',
'output_path']
}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>
</configuration>
<main-class>%s</main-class>
<java-opts>%s</java-opts>
<arg>swift://ex.sahara/i</arg>
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
# testing workflow creation with a proxy domain
self.override_config('use_domain_for_proxy_users', True)
self.override_config("proxy_user_domain_name", 'sahara_proxy_domain')
configs = {
'configs': {},
'args': ['swift://ex/i',
'output_path']
}
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs,
proxy=True)
res = workflow_factory.get_workflow_xml(job, u.create_cluster(),
job_exec)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.sahara.domain.name</name>
<value>sahara_proxy_domain</value>
</property>
<property>
<name>fs.swift.service.sahara.password</name>
<value>55555555-6666-7777-8888-999999999999</value>
</property>
<property>
<name>fs.swift.service.sahara.trust.id</name>
<value>0123456789abcdef0123456789abcdef</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>job_00000000-1111-2222-3333-4444444444444444</value>
</property>
</configuration>
<main-class>%s</main-class>
<java-opts>%s</java-opts>
<arg>swift://ex.sahara/i</arg>
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
开发者ID:degorenko,项目名称:sahara,代码行数:70,代码来源:test_job_manager.py
示例15: _build_workflow_common
def _build_workflow_common(self, job_type, streaming=False, proxy=False):
if streaming:
configs = {'edp.streaming.mapper': '/usr/bin/cat',
'edp.streaming.reducer': '/usr/bin/wc'}
configs = {'configs': configs}
else:
configs = {}
job, job_exec = u.create_job_exec(job_type, configs)
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
if streaming:
self.assertIn("""
<streaming>
<mapper>/usr/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>""", res)
self.assertIn("""
<property>
<name>mapred.output.dir</name>
<value>swift://ex.sahara/o</value>
</property>""", res)
self.assertIn("""
<property>
<name>mapred.input.dir</name>
<value>swift://ex.sahara/i</value>
</property>""", res)
if not proxy:
self.assertIn("""
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>""", res)
self.assertIn("""
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>""", res)
else:
# testing workflow creation with a proxy domain
self.override_config('use_domain_for_proxy_users', True)
self.override_config("proxy_user_domain_name",
'sahara_proxy_domain')
job, job_exec = u.create_job_exec(job_type, proxy=True)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<property>
<name>fs.swift.service.sahara.domain.name</name>
<value>sahara_proxy_domain</value>
</property>
<property>
<name>fs.swift.service.sahara.password</name>
<value>55555555-6666-7777-8888-999999999999</value>
</property>
<property>
<name>fs.swift.service.sahara.trust.id</name>
<value>0123456789abcdef0123456789abcdef</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>job_00000000-1111-2222-3333-4444444444444444</value>
</property>""", res)
开发者ID:degorenko,项目名称:sahara,代码行数:76,代码来源:test_job_manager.py
示例16: _build_workflow_common
def _build_workflow_common(self, job_type, streaming=False, proxy=False):
if streaming:
configs = {"edp.streaming.mapper": "/usr/bin/cat", "edp.streaming.reducer": "/usr/bin/wc"}
configs = {"configs": configs}
else:
configs = {}
job, job_exec = u.create_job_exec(job_type, configs)
input_data = u.create_data_source("swift://ex/i")
output_data = u.create_data_source("swift://ex/o")
data_source_urls = {input_data.id: input_data.url, output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs, input_data, output_data, "hadoop", data_source_urls
)
if streaming:
self.assertIn(
"""
<streaming>
<mapper>/usr/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>""",
res,
)
self.assertIn(
"""
<property>
<name>mapred.output.dir</name>
<value>swift://ex.sahara/o</value>
</property>""",
res,
)
self.assertIn(
"""
<property>
<name>mapred.input.dir</name>
<value>swift://ex.sahara/i</value>
</property>""",
res,
)
if not proxy:
self.assertIn(
"""
<property>
<name>fs.swift.service.sahara.password</name>
<value>admin1</value>
</property>""",
res,
)
self.assertIn(
"""
<property>
<name>fs.swift.service.sahara.username</name>
<value>admin</value>
</property>""",
res,
)
else:
# testing workflow creation with a proxy domain
self.override_config("use_domain_for_proxy_users", True)
self.override_config("proxy_user_domain_name", "sahara_proxy_domain")
job, job_exec = u.create_job_exec(job_type, proxy=True)
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs, input_data, output_data, "hadoop", data_source_urls
)
self.assertIn(
"""
<property>
<name>fs.swift.service.sahara.domain.name</name>
<value>sahara_proxy_domain</value>
</property>
<property>
<name>fs.swift.service.sahara.password</name>
<value>55555555-6666-7777-8888-999999999999</value>
</property>
<property>
<name>fs.swift.service.sahara.trust.id</name>
<value>0123456789abcdef0123456789abcdef</value>
</property>
<property>
<name>fs.swift.service.sahara.username</name>
<value>job_00000000-1111-2222-3333-4444444444444444</value>
</property>""",
res,
)
开发者ID:thefuyang,项目名称:sahara,代码行数:93,代码来源:test_job_manager.py
示例17: _prepare_run_job
def _prepare_run_job(self, job_execution):
ctx = context.ctx()
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
prepared_job_params = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(
job_execution, job, data_source_urls, self.cluster)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
# assuming substitution is enabled.
# If substitution is not enabled then updated_job_configs will
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs,
job_execution.id,
data_source_urls,
self.cluster)
)
job_execution = conductor.job_execution_update(
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
# Extract all the 'oozie.' configs so that they can be set in the
# job properties file. These are config values for Oozie itself,
# not the job code
oozie_params = {}
for k in list(configs):
if k.startswith('oozie.'):
oozie_params[k] = configs[k]
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(
self.cluster, data_source_urls[data_source.id])
break
external_hdfs_urls = self._resolve_external_hdfs_urls(
job_execution.job_configs)
for url in external_hdfs_urls:
h.configure_cluster_for_hdfs(self.cluster, url)
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job, configs,
proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs,
input_source, output_source,
hdfs_user, data_source_urls)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
prepared_job_params['context'] = ctx
prepared_job_params['hdfs_user'] = hdfs_user
prepared_job_params['path_to_workflow'] = path_to_workflow
prepared_job_params['use_hbase_lib'] = use_hbase_lib
prepared_job_params['job_execution'] = job_execution
prepared_job_params['oozie_params'] = oozie_params
prepared_job_params['wf_dir'] = wf_dir
prepared_job_params['oozie_server'] = oozie_server
return prepared_job_params
开发者ID:Imperat,项目名称:sahara,代码行数:86,代码来源:engine.py
示例18: run_job
def run_job(self, job_execution):
ctx = context.ctx()
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(
job_execution, job, data_source_urls, self.cluster)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
# assuming substitution is enabled.
# If substitution is not enabled then updated_job_configs will
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs,
job_execution.id,
data_source_urls,
self.cluster)
)
job_execution = conductor.job_execution_update(
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
# Extract all the 'oozie.' configs so that they can be set in the
# job properties file. These are config values for Oozie itself,
# not the job code
oozie_params = {}
for k in list(configs):
if k.startswith('oozie.'):
oozie_params[k] = configs[k]
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(
self.cluster, data_source_urls[data_source.id])
break
external_hdfs_urls = self._resolve_external_hdfs_urls(
job_execution.job_configs)
for url in external_hdfs_urls:
h.configure_cluster_for_hdfs(self.cluster, url)
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job, configs,
proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs,
input_source, output_source,
hdfs_user, data_source_urls)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow,
oozi
|
请发表评论