本文整理汇总了Java中org.apache.hadoop.mapred.TaskTracker类的典型用法代码示例。如果您正苦于以下问题:Java TaskTracker类的具体用法?Java TaskTracker怎么用?Java TaskTracker使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TaskTracker类属于org.apache.hadoop.mapred包,在下文中一共展示了TaskTracker类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initializeAttemptDirs
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* Create taskDirs on all the disks. Otherwise, in some cases, like when
* LinuxTaskController is in use, child might wish to balance load across
* disks but cannot itself create attempt directory because of the fact that
* job directory is writable only by the TT.
*
* @param user
* @param jobId
* @param attemptId
* @throws IOException
*/
public void initializeAttemptDirs(String user, String jobId,
String attemptId)
throws IOException {
boolean initStatus = false;
String attemptDirPath =
TaskTracker.getLocalTaskDir(user, jobId, attemptId);
for (String localDir : localDirs) {
Path localAttemptDir = new Path(localDir, attemptDirPath);
boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
if (!attemptDirStatus) {
LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ " couldn't be created.");
}
initStatus = initStatus || attemptDirStatus;
}
if (!initStatus) {
throw new IOException("Not able to initialize attempt directories "
+ "in any of the configured local directories for the attempt "
+ attemptId);
}
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:37,代码来源:Localizer.java
示例2: testFileSystemOtherThanDefault
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
conf.set("fs.fakefile.impl", FileSystem.getFileSystemClass("file", conf).getName());
String userName = getJobOwnerName();
conf.set("user.name", userName);
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
CacheFile file = new CacheFile(fileToCache.toUri(),
CacheFile.FileType.REGULAR,
false, 0, false);
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
System.currentTimeMillis(),
false, file);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:23,代码来源:TestTrackerDistributedCacheManager.java
示例3: testFileSystemOtherThanDefault
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
String userName = getJobOwnerName();
conf.set("user.name", userName);
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
CacheFile file = new CacheFile(fileToCache.toUri(),
CacheFile.FileType.REGULAR,
false, 0, false);
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
System.currentTimeMillis(),
false, file);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
开发者ID:Seagate,项目名称:hadoop-on-lustre,代码行数:23,代码来源:TestTrackerDistributedCacheManager.java
示例4: initializeAttemptDirs
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* Create taskDirs on all the disks. Otherwise, in some cases, like when
* LinuxTaskController is in use, child might wish to balance load across
* disks but cannot itself create attempt directory because of the fact that
* job directory is writable only by the TT.
*
* @param user
* @param jobId
* @param attemptId
* @param isCleanupAttempt
* @throws IOException
*/
public void initializeAttemptDirs(String user, String jobId,
String attemptId, boolean isCleanupAttempt)
throws IOException {
boolean initStatus = false;
String attemptDirPath =
TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
for (String localDir : localDirs) {
Path localAttemptDir = new Path(localDir, attemptDirPath);
boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
if (!attemptDirStatus) {
LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ " couldn't be created.");
}
initStatus = initStatus || attemptDirStatus;
}
if (!initStatus) {
throw new IOException("Not able to initialize attempt directories "
+ "in any of the configured local directories for the attempt "
+ attemptId);
}
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:38,代码来源:Localizer.java
示例5: testFileSystemOtherThanDefault
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
String userName = getJobOwnerName();
conf.set(MRJobConfig.USER_NAME, userName);
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
getFileStamp(firstCacheFile),
new Path(TEST_ROOT_DIR), false, false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:20,代码来源:TestTrackerDistributedCacheManager.java
示例6: TaskMemoryManagerThread
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
5000L));
this.taskTracker = taskTracker;
long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
LOG.info("Physical memory monitoring disabled");
} else {
maxRssMemoryAllowedForAllTasks =
totalPhysicalMemoryOnTT - reservedRssMemory;
if (maxRssMemoryAllowedForAllTasks < 0) {
maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
LOG.warn("Reserved physical memory exceeds total. Physical memory " +
"monitoring disabled.");
} else {
LOG.info(String.format("Physical memory monitoring enabled. " +
"System total: %s. Reserved: %s. Maximum: %s.",
totalPhysicalMemoryOnTT, reservedRssMemory,
maxRssMemoryAllowedForAllTasks));
}
}
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:30,代码来源:TaskMemoryManagerThread.java
示例7: initializeJobDirs
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* Prepare the job directories for a given job. To be called by the job
* localization code, only if the job is not already localized.
*
* <br>
* Here, we set 700 permissions on the job directories created on all disks.
* This we do so as to avoid any misuse by other users till the time
* {@link TaskController#initializeJob} is run at a
* later time to set proper private permissions on the job directories. <br>
*
* @param user
* @param jobId
* @throws IOException
*/
public void initializeJobDirs(String user, JobID jobId)
throws IOException {
boolean initJobDirStatus = false;
String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
for (String localDir : localDirs) {
Path jobDir = new Path(localDir, jobDirPath);
if (fs.exists(jobDir)) {
// this will happen on a partial execution of localizeJob. Sometimes
// copying job.xml to the local disk succeeds but copying job.jar might
// throw out an exception. We should clean up and then try again.
fs.delete(jobDir, true);
}
boolean jobDirStatus = fs.mkdirs(jobDir);
if (!jobDirStatus) {
LOG.warn("Not able to create job directory " + jobDir.toString());
}
initJobDirStatus = initJobDirStatus || jobDirStatus;
// job-dir has to be private to the TT
fs.setPermission(jobDir, new FsPermission((short)0700));
}
if (!initJobDirStatus) {
throw new IOException("Not able to initialize job directories "
+ "in any of the configured local directories for job "
+ jobId.toString());
}
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:45,代码来源:Localizer.java
示例8: TaskMemoryManagerThread
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
5000L));
this.taskTracker = taskTracker;
}
开发者ID:Seagate,项目名称:hadoop-on-lustre,代码行数:10,代码来源:TaskMemoryManagerThread.java
示例9: TaskMemoryManagerThread
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
5000L),
taskTracker.getJobConf().getLong(
"mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL));
this.taskTracker = taskTracker;
}
开发者ID:thisisvoa,项目名称:hadoop-0.20,代码行数:13,代码来源:TaskMemoryManagerThread.java
示例10: TaskMemoryManagerThread
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L));
this.taskTracker = taskTracker;
long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT();
long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT();
if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT ||
totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT;
} else {
maxRssMemoryAllowedForAllTasks =
totalPhysicalMemoryOnTT - reservedRssMemory;
}
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:16,代码来源:TaskMemoryManagerThread.java
示例11: initializeJobDirs
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* Prepare the job directories for a given job. To be called by the job
* localization code, only if the job is not already localized.
*
* <br>
* Here, we set 700 permissions on the job directories created on all disks.
* This we do so as to avoid any misuse by other users till the time
* {@link TaskController#initializeJob(JobInitializationContext)} is run at a
* later time to set proper private permissions on the job directories. <br>
*
* @param user
* @param jobId
* @throws IOException
*/
public void initializeJobDirs(String user, JobID jobId)
throws IOException {
boolean initJobDirStatus = false;
String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
for (String localDir : localDirs) {
Path jobDir = new Path(localDir, jobDirPath);
if (fs.exists(jobDir)) {
// this will happen on a partial execution of localizeJob. Sometimes
// copying job.xml to the local disk succeeds but copying job.jar might
// throw out an exception. We should clean up and then try again.
fs.delete(jobDir, true);
}
boolean jobDirStatus = fs.mkdirs(jobDir);
if (!jobDirStatus) {
LOG.warn("Not able to create job directory " + jobDir.toString());
}
initJobDirStatus = initJobDirStatus || jobDirStatus;
// job-dir has to be private to the TT
fs.setPermission(jobDir, new FsPermission((short)0700));
}
if (!initJobDirStatus) {
throw new IOException("Not able to initialize job directories "
+ "in any of the configured local directories for job "
+ jobId.toString());
}
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:45,代码来源:Localizer.java
示例12: MapOutput
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
JobConf conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput) throws IOException {
this.id = ID.incrementAndGet();
this.mapId = mapId;
this.merger = merger;
type = Type.DISK;
memory = null;
byteStream = null;
this.size = size;
this.localFS = FileSystem.getLocal(conf);
String filename = "map_" + mapId.getTaskID().getId() + ".out";
String tmpOutput = Path.SEPARATOR +
TaskTracker.getJobCacheSubdir(conf.getUser()) +
Path.SEPARATOR + mapId.getJobID() +
Path.SEPARATOR + merger.getReduceId() +
Path.SEPARATOR + "output" +
Path.SEPARATOR + filename +
"." + fetcher;
tmpOutputPath =
localDirAllocator.getLocalPathForWrite(tmpOutput, size, conf);
outputPath = new Path(tmpOutputPath.getParent(), filename);
disk = localFS.create(tmpOutputPath);
this.primaryMapOutput = primaryMapOutput;
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:32,代码来源:MapOutput.java
示例13: testCustomPermissions
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* Localize a file. After localization is complete, create a file, "myFile",
* under the directory where the file is localized and ensure that it has
* permissions different from what is set by default. Then, localize another
* file. Verify that "myFile" has the right permissions.
* @throws Exception
*/
public void testCustomPermissions() throws Exception {
if (!canRun()) {
return;
}
String userName = getJobOwnerName();
conf.set(MRJobConfig.USER_NAME, userName);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
FileSystem localfs = FileSystem.getLocal(conf);
Path[] localCache = new Path[2];
localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
FsPermission myPermission = new FsPermission((short)0600);
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
if (FileSystem.create(localfs, myFile, myPermission) == null) {
throw new IOException("Could not create " + myFile);
}
try {
localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(secondCacheFile), false,
getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
false);
FileStatus stat = localfs.getFileStatus(myFile);
assertTrue(stat.getPermission().equals(myPermission));
// validate permissions of localized files.
checkFilePermissions(localCache);
} finally {
localfs.delete(myFile, false);
}
}
开发者ID:rekhajoshm,项目名称:mapreduce-fork,代码行数:42,代码来源:TestTrackerDistributedCacheManager.java
示例14: testManagerFlow
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* This is the typical flow for using the DistributedCache classes.
*
* @throws IOException
* @throws LoginException
*/
public void testManagerFlow() throws IOException, LoginException {
if (!canRun()) {
return;
}
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(conf);
String userName = getJobOwnerName();
subConf.set("user.name", userName);
JobID jobid = new JobID("jt",1);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf,
FileSystem.get(subConf));
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache);
assertEquals(2, statCache.size());
// ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
subConf.writeXml(os);
os.close();
// ****** Imitate TaskRunner code.
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(jobid, subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
JobLocalizer.downloadPrivateCache(subConf);
// DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
// handle.setupPrivateCache(localDirAllocator, TaskTracker
// .getPrivateDistributedCacheDir(userName));
// // ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
assertEquals(2, localCacheFiles.length);
Path cachedFirstFile = localCacheFiles[0];
Path cachedSecondFile = localCacheFiles[1];
assertFileLengthEquals(firstCacheFile, cachedFirstFile);
assertFalse("Paths should be different.",
firstCacheFile.equals(cachedFirstFile));
assertEquals(1, handle.getClassPaths().size());
assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
checkFilePermissions(localCacheFiles);
// Cleanup
handle.release();
manager.purgeCache();
assertFalse(pathToFile(cachedFirstFile).exists());
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:67,代码来源:TestTrackerDistributedCacheManager.java
示例15: testSameNameFileArchiveCache
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public void testSameNameFileArchiveCache() throws IOException,
InterruptedException {
if (!canRun()) {
return;
}
TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
conf, taskController);
String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile");
createPublicTempFile(cacheFile);
Configuration conf1 = new Configuration(conf);
conf1.set("user.name", userName);
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
dumpState(conf1);
TaskDistributedCacheManager handle = manager
.newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0);
TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles()
.get(1);
String distCacheDir = TaskTracker.getPublicDistributedCacheDir();
Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1,
distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp,
true, cFile);
Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(),
conf1, distCacheDir, fs.getFileStatus(cacheFile), true,
cArchive.timestamp, true, cArchive);
assertNotSame("File and Archive resolve to the same path: "
+ localizedPathForFile + ". Should differ.", localizedPathForFile,
localizedPathForArchive);
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:43,代码来源:TestTrackerDistributedCacheManager.java
示例16: checkLocalizedPath
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
private void checkLocalizedPath(boolean visibility)
throws IOException, LoginException, InterruptedException, URISyntaxException {
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
if (visibility) {
createPublicTempFile(cacheFile);
} else {
createPrivateTempFile(cacheFile);
}
Configuration conf1 = new Configuration(conf);
conf1.set("user.name", userName);
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
dumpState(conf1);
// Task localizing for job
TaskDistributedCacheManager handle = manager
.newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
JobLocalizer.downloadPrivateCache(conf1);
TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
String distCacheDir;
if (visibility) {
distCacheDir = TaskTracker.getPublicDistributedCacheDir();
} else {
distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
}
Path localizedPath =
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
fs.getFileStatus(cacheFile), false,
c.timestamp, visibility, c);
assertTrue("Cache file didn't get localized in the expected directory. " +
"Expected localization to happen within " +
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
", but was localized at " +
localizedPath, localizedPath.toString().contains(distCacheDir));
if (visibility) {
checkPublicFilePermissions(new Path[]{localizedPath});
} else {
checkFilePermissions(new Path[]{localizedPath});
}
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:48,代码来源:TestTrackerDistributedCacheManager.java
示例17: MultiTaskTrackerMetrics
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public MultiTaskTrackerMetrics(List<TaskTracker> trackerList) {
this.trackerList = trackerList;
MetricsContext context = MetricsUtil.getContext("mapred");
metricsRecord = MetricsUtil.createRecord(context, "multitasktracker");
context.registerUpdater(this);
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:7,代码来源:MultiTaskTracker.java
示例18: doUpdates
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
@Override
public void doUpdates(MetricsContext context) {
LOG.info("Updating metrics");
int numTrackers = trackerList.size();
long totalMapRefill = 0;
long totalReduceRefill = 0;
int totalRunningMaps = 0;
int totalRunningReduces = 0;
int totalMapSlots = 0;
int totalReduceSlots = 0;
for (TaskTracker tracker : trackerList) {
totalMapRefill += tracker.getAveMapSlotRefillMsecs();
totalReduceRefill += tracker.getAveReduceSlotRefillMsecs();
totalRunningMaps += tracker.getRunningMaps();
totalRunningReduces += tracker.getRunningReduces();
totalMapSlots += tracker.getMaxActualMapTasks();
totalReduceSlots += tracker.getMaxActualReduceTasks();
// If the metrics exists, aggregate the task launch msecs for all
// trackers
TaskTrackerInstrumentation instrumentation =
tracker.getTaskTrackerInstrumentation();
if (instrumentation != null) {
MetricsTimeVaryingRate taskLaunchMsecs =
instrumentation.getTaskLaunchMsecs();
if (taskLaunchMsecs != null) {
taskLaunchMsecs.pushMetric(null);
aggTaskLaunchMsecs.inc(
taskLaunchMsecs.getPreviousIntervalAverageTime());
}
}
}
long avgMapRefill = totalMapRefill / numTrackers;
long avgReduceRefill = totalReduceRefill / numTrackers;
metricsRecord.setMetric("aveMapSlotRefillMsecs", avgMapRefill);
metricsRecord.setMetric("aveReduceSlotRefillMsecs", avgReduceRefill);
metricsRecord.setMetric("maps_running", totalRunningMaps);
metricsRecord.setMetric("reduces_running", totalRunningReduces);
metricsRecord.setMetric("mapTaskSlots", totalMapSlots);
metricsRecord.setMetric("reduceTaskSlots", totalReduceSlots);
for (MetricsBase metricsBase : registry.getMetricsList()) {
metricsBase.pushMetric(metricsRecord);
}
metricsRecord.update();
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:48,代码来源:MultiTaskTracker.java
示例19: TaskTrackerRunner
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
public TaskTrackerRunner(int id, TaskTracker tt) {
super();
this.ttToRun = tt;
this.id = id;
}
开发者ID:rhli,项目名称:hadoop-EAR,代码行数:6,代码来源:MultiTaskTracker.java
示例20: testManagerFlow
import org.apache.hadoop.mapred.TaskTracker; //导入依赖的package包/类
/**
* This is the typical flow for using the DistributedCache classes.
*
* @throws IOException
* @throws LoginException
*/
public void testManagerFlow() throws IOException, LoginException {
if (!canRun()) {
return;
}
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(conf);
String userName = getJobOwnerName();
subConf.set("user.name", userName);
JobID jobid = new JobID("jt",1);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf,
FileSystem.get(subConf));
TrackerDistributedCacheManager.determineTimestamps(subConf);
TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
// ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
subConf.writeXml(os);
os.close();
// ****** Imitate TaskRunner code.
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(jobid, subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
JobLocalizer.downloadPrivateCache(subConf);
// DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
// handle.setupPrivateCache(localDirAllocator, TaskTracker
// .getPrivateDistributedCacheDir(userName));
// // ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
assertEquals(2, localCacheFiles.length);
Path cachedFirstFile = localCacheFiles[0];
Path cachedSecondFile = localCacheFiles[1];
assertFileLengthEquals(firstCacheFile, cachedFirstFile);
assertFalse("Paths should be different.",
firstCacheFile.equals(cachedFirstFile));
assertEquals(1, handle.getClassPaths().size());
assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
checkFilePermissions(localCacheFiles);
// Cleanup
handle.release();
manager.purgeCache();
assertFalse(pathToFile(cachedFirstFile).exists());
}
开发者ID:Seagate,项目名称:hadoop-on-lustre,代码行数:64,代码来源:TestTrackerDistributedCacheManager.java
注:本文中的org.apache.hadoop.mapred.TaskTracker类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论