本文整理汇总了Python中sahara.utils.xmlutils.create_hadoop_xml函数的典型用法代码示例。如果您正苦于以下问题:Python create_hadoop_xml函数的具体用法?Python create_hadoop_xml怎么用?Python create_hadoop_xml使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_hadoop_xml函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: generate_xml_configs
def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port):
if hadoop_port is None:
hadoop_port = 8020
cfg = {
"fs.defaultFS": "hdfs://%s:%s" % (nn_hostname, str(hadoop_port)),
"dfs.namenode.name.dir": extract_hadoop_path(storage_path, "/dfs/nn"),
"dfs.datanode.data.dir": extract_hadoop_path(storage_path, "/dfs/dn"),
"hadoop.tmp.dir": extract_hadoop_path(storage_path, "/dfs"),
}
# inserting user-defined configs
for key, value in extract_hadoop_xml_confs(configs):
cfg[key] = value
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT
if CONF.enable_data_locality:
cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
xml_configs = {"core-site": x.create_hadoop_xml(cfg, core_all), "hdfs-site": x.create_hadoop_xml(cfg, HDFS_DEFAULT)}
return xml_configs
开发者ID:JohannaMW,项目名称:sahara,代码行数:26,代码来源:config_helper.py
示例2: generate_xml_configs
def generate_xml_configs(cluster, node_group, hive_mysql_passwd):
oozie_hostname = vu.get_instance_hostname(vu.get_oozie(cluster))
hive_hostname = vu.get_instance_hostname(vu.get_hiveserver(cluster))
ng_configs = node_group.configuration()
general_cfg = get_general_configs(hive_hostname, hive_mysql_passwd)
all_cfg = generate_sahara_configs(cluster, node_group)
# inserting user-defined configs
for key, value in extract_xml_confs(ng_configs):
all_cfg[key] = value
# applying swift configs if user enabled it
swift_xml_confs = swift.get_swift_configs()
all_cfg = generate_cfg_from_general(all_cfg, ng_configs, general_cfg)
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT + swift_xml_confs
mapred_all = MAPRED_DEFAULT
if CONF.enable_data_locality:
all_cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
mapred_all += topology.vm_awareness_mapred_config()
xml_configs = {
'core-site': x.create_hadoop_xml(all_cfg, core_all),
'mapred-site': x.create_hadoop_xml(all_cfg, mapred_all),
'hdfs-site': x.create_hadoop_xml(all_cfg, HDFS_DEFAULT)
}
if hive_hostname:
cfg = all_cfg
cfg_filter = HIVE_DEFAULT
proxy_configs = cluster.cluster_configs.get('proxy_configs')
if CONF.use_identity_api_v3 and proxy_configs:
cfg, cfg_filter = _inject_swift_trust_info(cfg,
cfg_filter,
proxy_configs)
xml_configs.update({'hive-site':
x.create_hadoop_xml(cfg, cfg_filter)})
LOG.debug('Generated hive-site.xml for hive {host}'.format(
host=hive_hostname))
if oozie_hostname:
xml_configs.update({'oozie-site':
x.create_hadoop_xml(all_cfg, o_h.OOZIE_DEFAULT)})
LOG.debug('Generated oozie-site.xml for oozie {host}'.format(
host=oozie_hostname))
return xml_configs
开发者ID:AlexanderYAPPO,项目名称:sahara,代码行数:55,代码来源:config_helper.py
示例3: run_job
def run_job(self, job_execution):
prepared_job_params = self._prepare_run_job(job_execution)
path_to_workflow = prepared_job_params['path_to_workflow']
hdfs_user = prepared_job_params['hdfs_user']
oozie_params = prepared_job_params['oozie_params']
use_hbase_lib = prepared_job_params['use_hbase_lib']
ctx = prepared_job_params['context']
job_execution = prepared_job_params['job_execution']
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow,
oozie_params,
use_hbase_lib)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
conductor.job_execution_update(
context.ctx(), job_execution.id,
{'info': {'status': edp.JOB_STATUS_READYTORUN},
'engine_job_id': oozie_job_id})
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_info(job_execution, oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:Imperat,项目名称:sahara,代码行数:32,代码来源:engine.py
示例4: _upload_wrapper_xml
def _upload_wrapper_xml(self, where, job_dir, job_configs):
xml_name = 'spark.xml'
proxy_configs = job_configs.get('proxy_configs')
configs = {}
cfgs = job_configs.get('configs', {})
if proxy_configs:
configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get(
'proxy_username')
configs[sw.HADOOP_SWIFT_PASSWORD] = key_manager.get_secret(
proxy_configs.get('proxy_password'))
configs[sw.HADOOP_SWIFT_TRUST_ID] = proxy_configs.get(
'proxy_trust_id')
configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name
else:
targets = [sw.HADOOP_SWIFT_USERNAME]
configs = {k: cfgs[k] for k in targets if k in cfgs}
if sw.HADOOP_SWIFT_PASSWORD in cfgs:
configs[sw.HADOOP_SWIFT_PASSWORD] = (
key_manager.get_secret(cfgs[sw.HADOOP_SWIFT_PASSWORD])
)
for s3_cfg_key in s3_common.S3_DS_CONFIGS:
if s3_cfg_key in cfgs:
if s3_cfg_key == s3_common.S3_SECRET_KEY_CONFIG:
configs[s3_cfg_key] = (
key_manager.get_secret(cfgs[s3_cfg_key])
)
else:
configs[s3_cfg_key] = cfgs[s3_cfg_key]
content = xmlutils.create_hadoop_xml(configs)
with remote.get_remote(where) as r:
dst = os.path.join(job_dir, xml_name)
r.write_file_to(dst, content)
return xml_name
开发者ID:openstack,项目名称:sahara,代码行数:35,代码来源:engine.py
示例5: generate_xml_configs
def generate_xml_configs(cluster, node_group, hive_mysql_passwd):
oozie_hostname = _get_hostname(utils.get_oozie(cluster))
hive_hostname = _get_hostname(utils.get_hiveserver(cluster))
ng_configs = node_group.configuration()
general_cfg = get_general_configs(hive_hostname, hive_mysql_passwd)
all_cfg = generate_sahara_configs(cluster, node_group)
# inserting user-defined configs
for key, value in extract_xml_confs(ng_configs):
all_cfg[key] = value
# applying swift configs if user enabled it
swift_xml_confs = swift.get_swift_configs()
all_cfg = generate_cfg_from_general(all_cfg, ng_configs, general_cfg)
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT + swift_xml_confs
mapred_all = MAPRED_DEFAULT
if CONF.enable_data_locality:
all_cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
mapred_all += topology.vm_awareness_mapred_config()
xml_configs = {
'core-site': x.create_hadoop_xml(all_cfg, core_all),
'mapred-site': x.create_hadoop_xml(all_cfg, mapred_all),
'hdfs-site': x.create_hadoop_xml(all_cfg, HDFS_DEFAULT)
}
if hive_hostname:
xml_configs.update({'hive-site':
x.create_hadoop_xml(all_cfg, HIVE_DEFAULT)})
LOG.debug('Generated hive-site.xml for hive % s', hive_hostname)
if oozie_hostname:
xml_configs.update({'oozie-site':
x.create_hadoop_xml(all_cfg, o_h.OOZIE_DEFAULT)})
LOG.debug('Generated oozie-site.xml for oozie % s', oozie_hostname)
return xml_configs
开发者ID:qinweiwei,项目名称:sahara,代码行数:46,代码来源:config_helper.py
示例6: run_job
def run_job(job_execution):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return job_execution
job = conductor.job_get(ctx, job_execution.job_id)
if not edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
else:
input_source = None
output_source = None
#TODO(nprivalova): should be removed after all features implemented
validate(input_source, output_source, job)
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(cluster, data_source)
hdfs_user = _get_hdfs_user(cluster)
oozie_server = _get_oozie_server(cluster)
wf_dir = create_workflow_dir(oozie_server, job, hdfs_user)
upload_job_files(oozie_server, wf_dir, job, hdfs_user)
creator = workflow_factory.get_creator(job)
wf_xml = creator.get_workflow_xml(cluster, job_execution,
input_source, output_source)
path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user)
rm_path = _get_resource_manager_path(cluster)
nn_path = cluster['info']['HDFS']['NameNode']
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/",
_get_oozie_server(cluster))
job_parameters = {"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,
"oozie.wf.application.path":
"%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters),
job_execution)
job_execution = conductor.job_execution_update(ctx, job_execution,
{'oozie_job_id':
oozie_job_id,
'start_time':
datetime.datetime.now()})
client.run_job(job_execution, oozie_job_id)
return job_execution
开发者ID:esala116,项目名称:sahara,代码行数:56,代码来源:job_manager.py
示例7: generate_xml_configs
def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port):
if hadoop_port is None:
hadoop_port = 8020
cfg = {
'fs.defaultFS': 'hdfs://%s:%s' % (nn_hostname, str(hadoop_port)),
'dfs.namenode.name.dir': extract_hadoop_path(storage_path,
'/dfs/nn'),
'dfs.datanode.data.dir': extract_hadoop_path(storage_path,
'/dfs/dn'),
'hadoop.tmp.dir': extract_hadoop_path(storage_path,
'/dfs'),
'dfs.hosts': '/etc/hadoop/dn.incl',
'dfs.hosts.exclude': '/etc/hadoop/dn.excl'
}
# inserting user-defined configs
for key, value in extract_hadoop_xml_confs(configs):
cfg[key] = value
# Add the swift defaults if they have not been set by the user
swft_def = []
if is_swift_enabled(configs):
swft_def = SWIFT_DEFAULTS
swift_configs = extract_name_values(swift.get_swift_configs())
for key, value in six.iteritems(swift_configs):
if key not in cfg:
cfg[key] = value
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT + swft_def
if CONF.enable_data_locality:
cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
xml_configs = {
'core-site': x.create_hadoop_xml(cfg, core_all),
'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT)
}
return xml_configs
开发者ID:AllenFromMinneapolis,项目名称:sahara,代码行数:43,代码来源:config_helper.py
示例8: generate_xml_configs
def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port):
"""dfs.name.dir': extract_hadoop_path(storage_path,
'/lib/hadoop/hdfs/namenode'),
'dfs.data.dir': extract_hadoop_path(storage_path,
'/lib/hadoop/hdfs/datanode'),
'dfs.name.dir': storage_path + 'hdfs/name',
'dfs.data.dir': storage_path + 'hdfs/data',
'dfs.hosts': '/etc/hadoop/dn.incl',
'dfs.hosts.exclude': '/etc/hadoop/dn.excl',
"""
if hadoop_port is None:
hadoop_port = 8020
cfg = {
'fs.defaultFS': 'hdfs://%s:%s' % (nn_hostname, str(hadoop_port)),
'dfs.namenode.name.dir': extract_hadoop_path(storage_path,
'/dfs/nn'),
'dfs.datanode.data.dir': extract_hadoop_path(storage_path,
'/dfs/dn'),
'hadoop.tmp.dir': extract_hadoop_path(storage_path,
'/dfs'),
}
# inserting user-defined configs
for key, value in extract_hadoop_xml_confs(configs):
cfg[key] = value
# invoking applied configs to appropriate xml files
core_all = CORE_DEFAULT
if CONF.enable_data_locality:
cfg.update(topology.TOPOLOGY_CONFIG)
# applying vm awareness configs
core_all += topology.vm_awareness_core_config()
xml_configs = {
'core-site': x.create_hadoop_xml(cfg, core_all),
'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT)
}
return xml_configs
开发者ID:hongbin,项目名称:sahara,代码行数:42,代码来源:config_helper.py
示例9: _configure_swift_to_inst
def _configure_swift_to_inst(instance):
cluster = instance.node_group.cluster
with instance.remote() as r:
r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % (
c_helper.get_swift_lib_url(cluster), HADOOP_LIB_DIR))
core_site = r.read_file_from(PATH_TO_CORE_SITE_XML)
configs = xmlutils.parse_hadoop_xml_with_name_and_value(core_site)
configs.extend(swift_helper.get_swift_configs())
confs = dict((c['name'], c['value']) for c in configs)
new_core_site = xmlutils.create_hadoop_xml(confs)
r.write_file_to(PATH_TO_CORE_SITE_XML, new_core_site, run_as_root=True)
开发者ID:viplav,项目名称:sahara,代码行数:11,代码来源:deploy.py
示例10: run_job
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution,
job)
proxy_configs = job_execution.job_configs.get('proxy_configs')
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job,
proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source,
hdfs_user)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:a9261,项目名称:sahara,代码行数:46,代码来源:engine.py
示例11: test_create_hadoop_xml
def test_create_hadoop_xml(self):
conf = x.load_hadoop_xml_defaults(
'tests/unit/resources/test-default.xml')
self.assertEqual(x.create_hadoop_xml({'name1': 'some_val1',
'name2': 2}, conf),
"""<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>name2</name>
<value>2</value>
</property>
<property>
<name>name1</name>
<value>some_val1</value>
</property>
</configuration>
""")
开发者ID:B-Rich,项目名称:sahara,代码行数:18,代码来源:test_xml_utils.py
示例12: run_job
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution,
job)
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
hdfs_user = self.plugin.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the plugin api now.
# However, other engines may need it.
oozie_server = self.plugin.get_oozie_server(self.cluster)
wf_dir = job_utils.create_hdfs_workflow_dir(oozie_server,
job, hdfs_user)
job_utils.upload_job_files_to_hdfs(oozie_server, wf_dir,
job, hdfs_user)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:COSHPC,项目名称:sahara,代码行数:43,代码来源:engine.py
示例13: _upload_wrapper_xml
def _upload_wrapper_xml(self, where, job_dir, job_configs):
xml_name = 'spark.xml'
proxy_configs = job_configs.get('proxy_configs')
configs = {}
if proxy_configs:
configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get(
'proxy_username')
configs[sw.HADOOP_SWIFT_PASSWORD] = proxy_configs.get(
'proxy_password')
configs[sw.HADOOP_SWIFT_TRUST_ID] = proxy_configs.get(
'proxy_trust_id')
configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name
else:
cfgs = job_configs.get('configs', {})
targets = [sw.HADOOP_SWIFT_USERNAME, sw.HADOOP_SWIFT_PASSWORD]
configs = {k: cfgs[k] for k in targets if k in cfgs}
content = xmlutils.create_hadoop_xml(configs)
with remote.get_remote(where) as r:
dst = os.path.join(job_dir, xml_name)
r.write_file_to(dst, content)
return xml_name
开发者ID:openstacking,项目名称:sahara,代码行数:22,代码来源:engine.py
示例14: _run_job
def _run_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return
job_execution = _update_job_execution_extra(job_execution, cluster)
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = _get_data_sources(job_execution, job)
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(cluster, data_source)
plugin = _get_plugin(cluster)
hdfs_user = plugin.get_hdfs_user()
oozie_server = plugin.get_oozie_server(cluster)
wf_dir = create_workflow_dir(oozie_server, job, hdfs_user)
upload_job_files(oozie_server, wf_dir, job, hdfs_user)
wf_xml = workflow_factory.get_workflow_xml(
job, cluster, job_execution, input_source, output_source)
path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user)
client = _create_oozie_client(cluster)
job_params = _get_oozie_job_params(cluster, hdfs_user, path_to_workflow)
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_update(
ctx, job_execution, {'oozie_job_id': oozie_job_id,
'start_time': datetime.datetime.now()})
client.run_job(job_execution, oozie_job_id)
开发者ID:B-Rich,项目名称:sahara,代码行数:39,代码来源:job_manager.py
示例15: run_scheduled_job
def run_scheduled_job(self, job_execution):
prepared_job_params = self._prepare_run_job(job_execution)
oozie_server = prepared_job_params['oozie_server']
wf_dir = prepared_job_params['wf_dir']
hdfs_user = prepared_job_params['hdfs_user']
oozie_params = prepared_job_params['oozie_params']
use_hbase_lib = prepared_job_params['use_hbase_lib']
ctx = prepared_job_params['context']
job_execution = prepared_job_params['job_execution']
coord_configs = {"jobTracker": "${jobTracker}",
"nameNode": "${nameNode}"}
coord_xml = self._create_coordinator_xml(coord_configs)
self._upload_coordinator_file(oozie_server, wf_dir, coord_xml,
hdfs_user)
job_params = self._get_oozie_job_params(
hdfs_user, None, oozie_params, use_hbase_lib,
job_execution.job_configs.job_execution_info, wf_dir,
"scheduled")
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:Imperat,项目名称:sahara,代码行数:36,代码来源:engine.py
示例16: run_job
def run_job(self, job_execution):
ctx = context.ctx()
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(
job_execution, job, data_source_urls, self.cluster)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
# assuming substitution is enabled.
# If substitution is not enabled then updated_job_configs will
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs,
job_execution.id,
data_source_urls,
self.cluster)
)
job_execution = conductor.job_execution_update(
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
# Extract all the 'oozie.' configs so that they can be set in the
# job properties file. These are config values for Oozie itself,
# not the job code
oozie_params = {}
for k in list(configs):
if k.startswith('oozie.'):
oozie_params[k] = configs[k]
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(
self.cluster, data_source_urls[data_source.id])
break
external_hdfs_urls = self._resolve_external_hdfs_urls(
job_execution.job_configs)
for url in external_hdfs_urls:
h.configure_cluster_for_hdfs(self.cluster, url)
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job, configs,
proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs,
input_source, output_source,
hdfs_user, data_source_urls)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user,
path_to_workflow,
oozie_params,
use_hbase_lib)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
conductor.job_execution_update(
context.ctx(), job_execution.id,
{'info': {'status': edp.JOB_STATUS_READYTORUN},
'engine_job_id': oozie_job_id})
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,
oozie_job_id)['status']
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:rogeryu27,项目名称:sahara,代码行数:98,代码来源:engine.py
示例17: to_xml_file_content
def to_xml_file_content(data):
return x.create_hadoop_xml(data)
开发者ID:a9261,项目名称:sahara,代码行数:2,代码来源:config_file_utils.py
示例18: _generate_xml
def _generate_xml(configs):
xml_confs = {}
for service, confs in six.iteritems(configs):
xml_confs[service] = x.create_hadoop_xml(confs)
return xml_confs
开发者ID:B-Rich,项目名称:sahara,代码行数:6,代码来源:config.py
示例19: run_job
def run_job(self, job_execution):
ctx = context.ctx()
data_source_urls = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution, job, data_source_urls)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
# assuming substitution is enabled.
# If substitution is not enabled then updated_job_configs will
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = job_utils.resolve_data_source_references(
job_execution.job_configs, job_execution.id, data_source_urls
)
job_execution = conductor.job_execution_update(ctx, job_execution, {"data_source_urls": data_source_urls})
proxy_configs = updated_job_configs.get("proxy_configs")
configs = updated_job_configs.get("configs", {})
use_hbase_lib = configs.get("edp.hbase_common_lib", {})
# Extract all the 'oozie.' configs so that they can be set in the
# job properties file. These are config values for Oozie itself,
# not the job code
oozie_params = {}
for k in list(configs):
if k.startswith("oozie."):
oozie_params[k] = configs[k]
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == "hdfs":
h.configure_cluster_for_hdfs(self.cluster, data_source_urls[data_source.id])
break
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the oozie engine api now.
oozie_server = self.get_oozie_server(self.cluster)
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job, configs, proxy_configs)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs, input_source, output_source, hdfs_user, data_source_urls
)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir, wf_xml, hdfs_user)
job_params = self._get_oozie_job_params(hdfs_user, path_to_workflow, oozie_params, use_hbase_lib)
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params), job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info["status"] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution, oozie_job_id)["status"]
except Exception:
status = None
return (oozie_job_id, status, None)
开发者ID:snowind,项目名称:sahara,代码行数:65,代码来源:engine.py
示例20: run_job
def run_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx,
job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return
if CONF.use_namespaces and not CONF.use_floating_ips:
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
oozie = plugin.get_oozie_server(cluster)
info = oozie.remote().get_neutron_info()
extra = job_execution.extra.copy()
extra['neutron'] = info
job_execution = conductor.job_execution_update(ctx,
job_execution_id,
{'extra': extra})
job = conductor.job_get(ctx, job_execution.job_id)
if not edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
else:
input_source = None
output_source = None
for data_source in [input_source, output_source]:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(cluster, data_source)
hdfs_user = _get_hdfs_user(cluster)
oozie_server = _get_oozie_server(cluster)
wf_dir = create_workflow_dir(oozie_server, job, hdfs_user)
upload_job_files(oozie_server, wf_dir, job, hdfs_user)
creator = workflow_factory.get_creator(job)
wf_xml = creator.get_workflow_xml(cluster, job_execution,
input_source, output_source)
path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
rm_path = plugin.get_resource_manager_uri(cluster)
nn_path = plugin.get_name_node_uri(cluster)
client = _create_oozie_client(cluster)
job_parameters = {"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,
"oozie.wf.application.path":
"%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters),
job_execution)
job_execution = conductor.job_execution_update(ctx, job_execution,
{'oozie_job_id':
oozie_job_id,
'start_time':
datetime.datetime.now()})
client.run_job(job_execution, oozie_job_id)
开发者ID:hongbin,项目名称:sahara,代码行数:67,代码来源:job_manager.py
注:本文中的sahara.utils.xmlutils.create_hadoop_xml函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论