Merge -r 1222694:1222695 from trunk to branch. FIXES: MAPREDUCE-3597
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1234086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71ac65859e
commit
5ddfc8b6b5
|
@ -18,7 +18,10 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
|
MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
|
MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the
|
||||||
|
job-history related information.
|
||||||
|
|
||||||
|
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
|
||||||
(Vinay Thota via amarrk)
|
(Vinay Thota via amarrk)
|
||||||
|
|
||||||
MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests.
|
MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests.
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -61,6 +63,8 @@ import org.junit.Test;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TestRumenJobTraces {
|
public class TestRumenJobTraces {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestRumenJobTraces.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSmallTrace() throws Exception {
|
public void testSmallTrace() throws Exception {
|
||||||
performSingleTest("sample-job-tracker-logs.gz",
|
performSingleTest("sample-job-tracker-logs.gz",
|
||||||
|
@ -236,11 +240,21 @@ public class TestRumenJobTraces {
|
||||||
parser = new Hadoop20JHParser(ris);
|
parser = new Hadoop20JHParser(ris);
|
||||||
ArrayList<String> seenEvents = new ArrayList<String>(150);
|
ArrayList<String> seenEvents = new ArrayList<String>(150);
|
||||||
|
|
||||||
getHistoryEvents(parser, seenEvents, null); // get events into seenEvents
|
// this is same as the one in input history file
|
||||||
|
String jobId = "job_200904211745_0002";
|
||||||
|
JobBuilder builder = new JobBuilder(jobId);
|
||||||
|
|
||||||
|
// get events into seenEvents
|
||||||
|
getHistoryEvents(parser, seenEvents, builder);
|
||||||
|
|
||||||
// Validate the events seen by history parser from
|
// Validate the events seen by history parser from
|
||||||
// history file v20-single-input-log.gz
|
// history file v20-single-input-log.gz
|
||||||
validateSeenHistoryEvents(seenEvents, goldLines);
|
validateSeenHistoryEvents(seenEvents, goldLines);
|
||||||
|
|
||||||
|
ParsedJob parsedJob = builder.build();
|
||||||
|
// validate the obtainXXX api of ParsedJob, ParsedTask and
|
||||||
|
// ParsedTaskAttempt
|
||||||
|
validateParsedJob(parsedJob, 20, 1, true);
|
||||||
} finally {
|
} finally {
|
||||||
if (parser != null) {
|
if (parser != null) {
|
||||||
parser.close();
|
parser.close();
|
||||||
|
@ -584,9 +598,11 @@ public class TestRumenJobTraces {
|
||||||
// validate resource usage metrics
|
// validate resource usage metrics
|
||||||
// get the job counters
|
// get the job counters
|
||||||
Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
|
Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
|
||||||
|
|
||||||
|
// get the parsed job
|
||||||
|
ParsedJob parsedJob = builder.build();
|
||||||
// get the logged job
|
// get the logged job
|
||||||
LoggedJob loggedJob = builder.build();
|
LoggedJob loggedJob = parsedJob;
|
||||||
// get the logged attempts
|
// get the logged attempts
|
||||||
LoggedTaskAttempt attempt =
|
LoggedTaskAttempt attempt =
|
||||||
loggedJob.getMapTasks().get(0).getAttempts().get(0);
|
loggedJob.getMapTasks().get(0).getAttempts().get(0);
|
||||||
|
@ -600,6 +616,10 @@ public class TestRumenJobTraces {
|
||||||
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
|
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
|
||||||
counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
|
counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
|
||||||
true);
|
true);
|
||||||
|
|
||||||
|
// validate the obtainXXX api of ParsedJob, ParsedTask and
|
||||||
|
// ParsedTaskAttempt
|
||||||
|
validateParsedJob(parsedJob, 1, 1, false);
|
||||||
} finally {
|
} finally {
|
||||||
// stop the MR cluster
|
// stop the MR cluster
|
||||||
mrCluster.shutdown();
|
mrCluster.shutdown();
|
||||||
|
@ -616,6 +636,142 @@ public class TestRumenJobTraces {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify if the obtainXXX methods of {@link ParsedJob}, {@link ParsedTask}
|
||||||
|
* and {@link ParsedTaskAttempt} give valid info
|
||||||
|
*/
|
||||||
|
private void validateParsedJob(ParsedJob parsedJob, int numMaps,
|
||||||
|
int numReduces, boolean pre21JobHistory) {
|
||||||
|
validateParsedJobAPI(parsedJob, numMaps, numReduces, pre21JobHistory);
|
||||||
|
|
||||||
|
List<ParsedTask> maps = parsedJob.obtainMapTasks();
|
||||||
|
for (ParsedTask task : maps) {
|
||||||
|
validateParsedTask(task);
|
||||||
|
}
|
||||||
|
List<ParsedTask> reduces = parsedJob.obtainReduceTasks();
|
||||||
|
for (ParsedTask task : reduces) {
|
||||||
|
validateParsedTask(task);
|
||||||
|
}
|
||||||
|
List<ParsedTask> others = parsedJob.obtainOtherTasks();
|
||||||
|
for (ParsedTask task : others) {
|
||||||
|
validateParsedTask(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Verify if the obtainXXX methods of {@link ParsedJob} give valid info */
|
||||||
|
private void validateParsedJobAPI(ParsedJob parsedJob, int numMaps,
|
||||||
|
int numReduces, boolean pre21JobHistory) {
|
||||||
|
LOG.info("Validating ParsedJob.obtainXXX api... for "
|
||||||
|
+ parsedJob.getJobID());
|
||||||
|
assertNotNull("Job acls in ParsedJob is null",
|
||||||
|
parsedJob.obtainJobAcls());
|
||||||
|
assertNotNull("Job conf path in ParsedJob is null",
|
||||||
|
parsedJob.obtainJobConfpath());
|
||||||
|
|
||||||
|
assertNotNull("Map Counters in ParsedJob is null",
|
||||||
|
parsedJob.obtainMapCounters());
|
||||||
|
assertNotNull("Reduce Counters in ParsedJob is null",
|
||||||
|
parsedJob.obtainReduceCounters());
|
||||||
|
assertNotNull("Total Counters in ParsedJob is null",
|
||||||
|
parsedJob.obtainTotalCounters());
|
||||||
|
|
||||||
|
assertNotNull("Map Tasks List in ParsedJob is null",
|
||||||
|
parsedJob.obtainMapTasks());
|
||||||
|
assertNotNull("Reduce Tasks List in ParsedJob is null",
|
||||||
|
parsedJob.obtainReduceTasks());
|
||||||
|
assertNotNull("Other Tasks List in ParsedJob is null",
|
||||||
|
parsedJob.obtainOtherTasks());
|
||||||
|
|
||||||
|
// 1 map and 1 reduce task should be there
|
||||||
|
assertEquals("Number of map tasks in ParsedJob is wrong",
|
||||||
|
numMaps, parsedJob.obtainMapTasks().size());
|
||||||
|
assertEquals("Number of reduce tasks in ParsedJob is wrong",
|
||||||
|
numReduces, parsedJob.obtainReduceTasks().size(), 1);
|
||||||
|
|
||||||
|
// old hadoop20 version history files don't have job-level-map-counters and
|
||||||
|
// job-level-reduce-counters. Only total counters exist there.
|
||||||
|
assertTrue("Total Counters in ParsedJob is empty",
|
||||||
|
parsedJob.obtainTotalCounters().size() > 0);
|
||||||
|
if (!pre21JobHistory) {
|
||||||
|
assertTrue("Map Counters in ParsedJob is empty",
|
||||||
|
parsedJob.obtainMapCounters().size() > 0);
|
||||||
|
assertTrue("Reduce Counters in ParsedJob is empty",
|
||||||
|
parsedJob.obtainReduceCounters().size() > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify if the obtainXXX methods of {@link ParsedTask} and
|
||||||
|
* {@link ParsedTaskAttempt} give valid info
|
||||||
|
*/
|
||||||
|
private void validateParsedTask(ParsedTask parsedTask) {
|
||||||
|
validateParsedTaskAPI(parsedTask);
|
||||||
|
|
||||||
|
List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
|
||||||
|
for (ParsedTaskAttempt attempt : attempts) {
|
||||||
|
validateParsedTaskAttemptAPI(attempt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Verify if the obtainXXX methods of {@link ParsedTask} give valid info */
|
||||||
|
private void validateParsedTaskAPI(ParsedTask parsedTask) {
|
||||||
|
LOG.info("Validating ParsedTask.obtainXXX api... for "
|
||||||
|
+ parsedTask.getTaskID());
|
||||||
|
assertNotNull("Task counters in ParsedTask is null",
|
||||||
|
parsedTask.obtainCounters());
|
||||||
|
|
||||||
|
if (parsedTask.getTaskStatus()
|
||||||
|
== Pre21JobHistoryConstants.Values.SUCCESS) {
|
||||||
|
// task counters should not be empty
|
||||||
|
assertTrue("Task counters in ParsedTask is empty",
|
||||||
|
parsedTask.obtainCounters().size() > 0);
|
||||||
|
assertNull("Diagnostic-info is non-null for a succeeded task",
|
||||||
|
parsedTask.obtainDiagnosticInfo());
|
||||||
|
assertNull("Failed-due-to-attemptId is non-null for a succeeded task",
|
||||||
|
parsedTask.obtainFailedDueToAttemptId());
|
||||||
|
} else {
|
||||||
|
assertNotNull("Diagnostic-info is non-null for a succeeded task",
|
||||||
|
parsedTask.obtainDiagnosticInfo());
|
||||||
|
assertNotNull("Failed-due-to-attemptId is non-null for a succeeded task",
|
||||||
|
parsedTask.obtainFailedDueToAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
|
||||||
|
assertNotNull("TaskAttempts list in ParsedTask is null", attempts);
|
||||||
|
assertTrue("TaskAttempts list in ParsedTask is empty",
|
||||||
|
attempts.size() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify if the obtainXXX methods of {@link ParsedTaskAttempt} give
|
||||||
|
* valid info
|
||||||
|
*/
|
||||||
|
private void validateParsedTaskAttemptAPI(
|
||||||
|
ParsedTaskAttempt parsedTaskAttempt) {
|
||||||
|
LOG.info("Validating ParsedTaskAttempt.obtainXXX api... for "
|
||||||
|
+ parsedTaskAttempt.getAttemptID());
|
||||||
|
assertNotNull("Counters in ParsedTaskAttempt is null",
|
||||||
|
parsedTaskAttempt.obtainCounters());
|
||||||
|
|
||||||
|
if (parsedTaskAttempt.getResult()
|
||||||
|
== Pre21JobHistoryConstants.Values.SUCCESS) {
|
||||||
|
assertTrue("Counters in ParsedTaskAttempt is empty",
|
||||||
|
parsedTaskAttempt.obtainCounters().size() > 0);
|
||||||
|
assertNull("Diagnostic-info is non-null for a succeeded taskAttempt",
|
||||||
|
parsedTaskAttempt.obtainDiagnosticInfo());
|
||||||
|
} else {
|
||||||
|
assertNotNull("Diagnostic-info is non-null for a succeeded taskAttempt",
|
||||||
|
parsedTaskAttempt.obtainDiagnosticInfo());
|
||||||
|
}
|
||||||
|
assertNotNull("TrackerName in ParsedTaskAttempt is null",
|
||||||
|
parsedTaskAttempt.obtainTrackerName());
|
||||||
|
|
||||||
|
assertNotNull("http-port info in ParsedTaskAttempt is null",
|
||||||
|
parsedTaskAttempt.obtainHttpPort());
|
||||||
|
assertNotNull("Shuffle-port info in ParsedTaskAttempt is null",
|
||||||
|
parsedTaskAttempt.obtainShufflePort());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJobConfigurationParser() throws Exception {
|
public void testJobConfigurationParser() throws Exception {
|
||||||
|
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobFinished;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
|
||||||
|
@ -45,14 +46,15 @@ import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFailed;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
|
||||||
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
|
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
|
||||||
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,16 +69,16 @@ public class JobBuilder {
|
||||||
|
|
||||||
private boolean finalized = false;
|
private boolean finalized = false;
|
||||||
|
|
||||||
private LoggedJob result = new LoggedJob();
|
private ParsedJob result = new ParsedJob();
|
||||||
|
|
||||||
private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
|
private Map<String, ParsedTask> mapTasks = new HashMap<String, ParsedTask>();
|
||||||
private Map<String, LoggedTask> reduceTasks =
|
private Map<String, ParsedTask> reduceTasks =
|
||||||
new HashMap<String, LoggedTask>();
|
new HashMap<String, ParsedTask>();
|
||||||
private Map<String, LoggedTask> otherTasks =
|
private Map<String, ParsedTask> otherTasks =
|
||||||
new HashMap<String, LoggedTask>();
|
new HashMap<String, ParsedTask>();
|
||||||
|
|
||||||
private Map<String, LoggedTaskAttempt> attempts =
|
private Map<String, ParsedTaskAttempt> attempts =
|
||||||
new HashMap<String, LoggedTaskAttempt>();
|
new HashMap<String, ParsedTaskAttempt>();
|
||||||
|
|
||||||
private Map<ParsedHost, ParsedHost> allHosts =
|
private Map<ParsedHost, ParsedHost> allHosts =
|
||||||
new HashMap<ParsedHost, ParsedHost>();
|
new HashMap<ParsedHost, ParsedHost>();
|
||||||
|
@ -123,7 +125,7 @@ public class JobBuilder {
|
||||||
public void process(HistoryEvent event) {
|
public void process(HistoryEvent event) {
|
||||||
if (finalized) {
|
if (finalized) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"JobBuilder.process(HistoryEvent event) called after LoggedJob built");
|
"JobBuilder.process(HistoryEvent event) called after ParsedJob built");
|
||||||
}
|
}
|
||||||
|
|
||||||
// these are in lexicographical order by class name.
|
// these are in lexicographical order by class name.
|
||||||
|
@ -229,12 +231,16 @@ public class JobBuilder {
|
||||||
public void process(Properties conf) {
|
public void process(Properties conf) {
|
||||||
if (finalized) {
|
if (finalized) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"JobBuilder.process(Properties conf) called after LoggedJob built");
|
"JobBuilder.process(Properties conf) called after ParsedJob built");
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO remove this once the deprecate APIs in LoggedJob are removed
|
//TODO remove this once the deprecate APIs in LoggedJob are removed
|
||||||
result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
|
String queue = extract(conf, JobConfPropertyNames.QUEUE_NAMES
|
||||||
.getCandidates(), "default"));
|
.getCandidates(), null);
|
||||||
|
// set the queue name if existing
|
||||||
|
if (queue != null) {
|
||||||
|
result.setQueue(queue);
|
||||||
|
}
|
||||||
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
|
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
|
||||||
.getCandidates(), null));
|
.getCandidates(), null));
|
||||||
|
|
||||||
|
@ -252,9 +258,9 @@ public class JobBuilder {
|
||||||
* Request the builder to build the final object. Once called, the
|
* Request the builder to build the final object. Once called, the
|
||||||
* {@link JobBuilder} would accept no more events or job-conf properties.
|
* {@link JobBuilder} would accept no more events or job-conf properties.
|
||||||
*
|
*
|
||||||
* @return Parsed {@link LoggedJob} object.
|
* @return Parsed {@link ParsedJob} object.
|
||||||
*/
|
*/
|
||||||
public LoggedJob build() {
|
public ParsedJob build() {
|
||||||
// The main job here is to build CDFs and manage the conf
|
// The main job here is to build CDFs and manage the conf
|
||||||
finalized = true;
|
finalized = true;
|
||||||
|
|
||||||
|
@ -416,7 +422,7 @@ public class JobBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
|
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
|
||||||
LoggedTask task = getTask(event.getTaskId().toString());
|
ParsedTask task = getTask(event.getTaskId().toString());
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -424,7 +430,7 @@ public class JobBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskStartedEvent(TaskStartedEvent event) {
|
private void processTaskStartedEvent(TaskStartedEvent event) {
|
||||||
LoggedTask task =
|
ParsedTask task =
|
||||||
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
|
||||||
task.setStartTime(event.getStartTime());
|
task.setStartTime(event.getStartTime());
|
||||||
task.setPreferredLocations(preferredLocationForSplits(event
|
task.setPreferredLocations(preferredLocationForSplits(event
|
||||||
|
@ -432,7 +438,7 @@ public class JobBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskFinishedEvent(TaskFinishedEvent event) {
|
private void processTaskFinishedEvent(TaskFinishedEvent event) {
|
||||||
LoggedTask task =
|
ParsedTask task =
|
||||||
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -443,18 +449,22 @@ public class JobBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskFailedEvent(TaskFailedEvent event) {
|
private void processTaskFailedEvent(TaskFailedEvent event) {
|
||||||
LoggedTask task =
|
ParsedTask task =
|
||||||
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
task.setFinishTime(event.getFinishTime());
|
task.setFinishTime(event.getFinishTime());
|
||||||
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
|
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
|
||||||
|
TaskFailed t = (TaskFailed)(event.getDatum());
|
||||||
|
task.putDiagnosticInfo(t.error.toString());
|
||||||
|
task.putFailedDueToAttemptId(t.failedDueToAttempt.toString());
|
||||||
|
// No counters in TaskFailedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskAttemptUnsuccessfulCompletionEvent(
|
private void processTaskAttemptUnsuccessfulCompletionEvent(
|
||||||
TaskAttemptUnsuccessfulCompletionEvent event) {
|
TaskAttemptUnsuccessfulCompletionEvent event) {
|
||||||
LoggedTaskAttempt attempt =
|
ParsedTaskAttempt attempt =
|
||||||
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
||||||
event.getTaskAttemptId().toString());
|
event.getTaskAttemptId().toString());
|
||||||
|
|
||||||
|
@ -476,20 +486,27 @@ public class JobBuilder {
|
||||||
attempt.arraySetCpuUsages(event.getCpuUsages());
|
attempt.arraySetCpuUsages(event.getCpuUsages());
|
||||||
attempt.arraySetVMemKbytes(event.getVMemKbytes());
|
attempt.arraySetVMemKbytes(event.getVMemKbytes());
|
||||||
attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
|
attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
|
||||||
|
TaskAttemptUnsuccessfulCompletion t =
|
||||||
|
(TaskAttemptUnsuccessfulCompletion) (event.getDatum());
|
||||||
|
attempt.putDiagnosticInfo(t.error.toString());
|
||||||
|
// No counters in TaskAttemptUnsuccessfulCompletionEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
|
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
|
||||||
LoggedTaskAttempt attempt =
|
ParsedTaskAttempt attempt =
|
||||||
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
||||||
event.getTaskAttemptId().toString());
|
event.getTaskAttemptId().toString());
|
||||||
if (attempt == null) {
|
if (attempt == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
attempt.setStartTime(event.getStartTime());
|
attempt.setStartTime(event.getStartTime());
|
||||||
|
attempt.putTrackerName(event.getTrackerName());
|
||||||
|
attempt.putHttpPort(event.getHttpPort());
|
||||||
|
attempt.putShufflePort(event.getShufflePort());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
|
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
|
||||||
LoggedTaskAttempt attempt =
|
ParsedTaskAttempt attempt =
|
||||||
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
||||||
event.getAttemptId().toString());
|
event.getAttemptId().toString());
|
||||||
if (attempt == null) {
|
if (attempt == null) {
|
||||||
|
@ -507,7 +524,7 @@ public class JobBuilder {
|
||||||
|
|
||||||
private void processReduceAttemptFinishedEvent(
|
private void processReduceAttemptFinishedEvent(
|
||||||
ReduceAttemptFinishedEvent event) {
|
ReduceAttemptFinishedEvent event) {
|
||||||
LoggedTaskAttempt attempt =
|
ParsedTaskAttempt attempt =
|
||||||
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
||||||
event.getAttemptId().toString());
|
event.getAttemptId().toString());
|
||||||
if (attempt == null) {
|
if (attempt == null) {
|
||||||
|
@ -536,7 +553,7 @@ public class JobBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
|
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
|
||||||
LoggedTaskAttempt attempt =
|
ParsedTaskAttempt attempt =
|
||||||
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
|
||||||
event.getAttemptId().toString());
|
event.getAttemptId().toString());
|
||||||
if (attempt == null) {
|
if (attempt == null) {
|
||||||
|
@ -568,6 +585,7 @@ public class JobBuilder {
|
||||||
result.setOutcome(Pre21JobHistoryConstants.Values
|
result.setOutcome(Pre21JobHistoryConstants.Values
|
||||||
.valueOf(event.getStatus()));
|
.valueOf(event.getStatus()));
|
||||||
result.setFinishTime(event.getFinishTime());
|
result.setFinishTime(event.getFinishTime());
|
||||||
|
// No counters in JobUnsuccessfulCompletionEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processJobSubmittedEvent(JobSubmittedEvent event) {
|
private void processJobSubmittedEvent(JobSubmittedEvent event) {
|
||||||
|
@ -575,8 +593,14 @@ public class JobBuilder {
|
||||||
result.setJobName(event.getJobName());
|
result.setJobName(event.getJobName());
|
||||||
result.setUser(event.getUserName());
|
result.setUser(event.getUserName());
|
||||||
result.setSubmitTime(event.getSubmitTime());
|
result.setSubmitTime(event.getSubmitTime());
|
||||||
// job queue name is set when conf file is processed.
|
result.putJobConfPath(event.getJobConfPath());
|
||||||
// See JobBuilder.process(Properties) method for details.
|
result.putJobAcls(event.getJobAcls());
|
||||||
|
|
||||||
|
// set the queue name if existing
|
||||||
|
String queue = event.getJobQueueName();
|
||||||
|
if (queue != null) {
|
||||||
|
result.setQueue(queue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
|
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
|
||||||
|
@ -603,10 +627,19 @@ public class JobBuilder {
|
||||||
result.setFinishTime(event.getFinishTime());
|
result.setFinishTime(event.getFinishTime());
|
||||||
result.setJobID(jobID);
|
result.setJobID(jobID);
|
||||||
result.setOutcome(Values.SUCCESS);
|
result.setOutcome(Values.SUCCESS);
|
||||||
|
|
||||||
|
JobFinished job = (JobFinished)event.getDatum();
|
||||||
|
Map<String, Long> countersMap =
|
||||||
|
JobHistoryUtils.extractCounters(job.totalCounters);
|
||||||
|
result.putTotalCounters(countersMap);
|
||||||
|
countersMap = JobHistoryUtils.extractCounters(job.mapCounters);
|
||||||
|
result.putMapCounters(countersMap);
|
||||||
|
countersMap = JobHistoryUtils.extractCounters(job.reduceCounters);
|
||||||
|
result.putReduceCounters(countersMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private LoggedTask getTask(String taskIDname) {
|
private ParsedTask getTask(String taskIDname) {
|
||||||
LoggedTask result = mapTasks.get(taskIDname);
|
ParsedTask result = mapTasks.get(taskIDname);
|
||||||
|
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
return result;
|
return result;
|
||||||
|
@ -630,9 +663,9 @@ public class JobBuilder {
|
||||||
* if true, we can create a task.
|
* if true, we can create a task.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
|
private ParsedTask getOrMakeTask(TaskType type, String taskIDname,
|
||||||
boolean allowCreate) {
|
boolean allowCreate) {
|
||||||
Map<String, LoggedTask> taskMap = otherTasks;
|
Map<String, ParsedTask> taskMap = otherTasks;
|
||||||
List<LoggedTask> tasks = this.result.getOtherTasks();
|
List<LoggedTask> tasks = this.result.getOtherTasks();
|
||||||
|
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -650,10 +683,10 @@ public class JobBuilder {
|
||||||
// no code
|
// no code
|
||||||
}
|
}
|
||||||
|
|
||||||
LoggedTask result = taskMap.get(taskIDname);
|
ParsedTask result = taskMap.get(taskIDname);
|
||||||
|
|
||||||
if (result == null && allowCreate) {
|
if (result == null && allowCreate) {
|
||||||
result = new LoggedTask();
|
result = new ParsedTask();
|
||||||
result.setTaskType(getPre21Value(type.toString()));
|
result.setTaskType(getPre21Value(type.toString()));
|
||||||
result.setTaskID(taskIDname);
|
result.setTaskID(taskIDname);
|
||||||
taskMap.put(taskIDname, result);
|
taskMap.put(taskIDname, result);
|
||||||
|
@ -663,13 +696,13 @@ public class JobBuilder {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
|
private ParsedTaskAttempt getOrMakeTaskAttempt(TaskType type,
|
||||||
String taskIDName, String taskAttemptName) {
|
String taskIDName, String taskAttemptName) {
|
||||||
LoggedTask task = getOrMakeTask(type, taskIDName, false);
|
ParsedTask task = getOrMakeTask(type, taskIDName, false);
|
||||||
LoggedTaskAttempt result = attempts.get(taskAttemptName);
|
ParsedTaskAttempt result = attempts.get(taskAttemptName);
|
||||||
|
|
||||||
if (result == null && task != null) {
|
if (result == null && task != null) {
|
||||||
result = new LoggedTaskAttempt();
|
result = new ParsedTaskAttempt();
|
||||||
result.setAttemptID(taskAttemptName);
|
result.setAttemptID(taskAttemptName);
|
||||||
attempts.put(taskAttemptName, result);
|
attempts.put(taskAttemptName, result);
|
||||||
task.getAttempts().add(result);
|
task.getAttempts().add(result);
|
||||||
|
|
|
@ -18,10 +18,15 @@
|
||||||
package org.apache.hadoop.tools.rumen;
|
package org.apache.hadoop.tools.rumen;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -143,4 +148,21 @@ public class JobHistoryUtils {
|
||||||
String jobId = extractJobIDFromConfFileName(fileName);
|
String jobId = extractJobIDFromConfFileName(fileName);
|
||||||
return jobId != null;
|
return jobId != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract/Add counters into the Map from the given JhCounters object.
|
||||||
|
* @param counters the counters to be extracted from
|
||||||
|
* @return the map of counters
|
||||||
|
*/
|
||||||
|
static Map<String, Long> extractCounters(JhCounters counters) {
|
||||||
|
Map<String, Long> countersMap = new HashMap<String, Long>();
|
||||||
|
if (counters != null) {
|
||||||
|
for (JhCounterGroup group : counters.groups) {
|
||||||
|
for (JhCounter counter : group.counts) {
|
||||||
|
countersMap.put(counter.name.toString(), counter.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return countersMap;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,6 +360,10 @@ public class LoggedJob implements DeepCompare {
|
||||||
this.relativeTime = relativeTime;
|
this.relativeTime = relativeTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return job queue name if it is available in job history file or
|
||||||
|
* job history conf file. Returns null otherwise.
|
||||||
|
*/
|
||||||
public QueueName getQueue() {
|
public QueueName getQueue() {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools.rumen;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrapper class around {@link LoggedJob}. This provides also the
|
||||||
|
* extra information about the job obtained from job history which is not
|
||||||
|
* written to the JSON trace file.
|
||||||
|
*/
|
||||||
|
public class ParsedJob extends LoggedJob {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ParsedJob.class);
|
||||||
|
|
||||||
|
private Map<String, Long> totalCountersMap = new HashMap<String, Long>();
|
||||||
|
private Map<String, Long> mapCountersMap = new HashMap<String, Long>();
|
||||||
|
private Map<String, Long> reduceCountersMap = new HashMap<String, Long>();
|
||||||
|
|
||||||
|
private String jobConfPath;
|
||||||
|
private Map<JobACL, AccessControlList> jobAcls;
|
||||||
|
|
||||||
|
ParsedJob() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ParsedJob(String jobID) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
setJobID(jobID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the job total counters */
|
||||||
|
void putTotalCounters(Map<String, Long> totalCounters) {
|
||||||
|
this.totalCountersMap = totalCounters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the job total counters
|
||||||
|
*/
|
||||||
|
public Map<String, Long> obtainTotalCounters() {
|
||||||
|
return totalCountersMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the job level map tasks' counters */
|
||||||
|
void putMapCounters(Map<String, Long> mapCounters) {
|
||||||
|
this.mapCountersMap = mapCounters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the job level map tasks' counters
|
||||||
|
*/
|
||||||
|
public Map<String, Long> obtainMapCounters() {
|
||||||
|
return mapCountersMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the job level reduce tasks' counters */
|
||||||
|
void putReduceCounters(Map<String, Long> reduceCounters) {
|
||||||
|
this.reduceCountersMap = reduceCounters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the job level reduce tasks' counters
|
||||||
|
*/
|
||||||
|
public Map<String, Long> obtainReduceCounters() {
|
||||||
|
return reduceCountersMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the job conf path in staging dir on hdfs */
|
||||||
|
void putJobConfPath(String confPath) {
|
||||||
|
jobConfPath = confPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the job conf path in staging dir on hdfs
|
||||||
|
*/
|
||||||
|
public String obtainJobConfpath() {
|
||||||
|
return jobConfPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the job acls */
|
||||||
|
void putJobAcls(Map<JobACL, AccessControlList> acls) {
|
||||||
|
jobAcls = acls;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the job acls
|
||||||
|
*/
|
||||||
|
public Map<JobACL, AccessControlList> obtainJobAcls() {
|
||||||
|
return jobAcls;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of map tasks of this job
|
||||||
|
*/
|
||||||
|
public List<ParsedTask> obtainMapTasks() {
|
||||||
|
List<LoggedTask> tasks = super.getMapTasks();
|
||||||
|
return convertTasks(tasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of reduce tasks of this job
|
||||||
|
*/
|
||||||
|
public List<ParsedTask> obtainReduceTasks() {
|
||||||
|
List<LoggedTask> tasks = super.getReduceTasks();
|
||||||
|
return convertTasks(tasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of other tasks of this job
|
||||||
|
*/
|
||||||
|
public List<ParsedTask> obtainOtherTasks() {
|
||||||
|
List<LoggedTask> tasks = super.getOtherTasks();
|
||||||
|
return convertTasks(tasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** As we know that this list of {@link LoggedTask} objects is actually a list
|
||||||
|
* of {@link ParsedTask} objects, we go ahead and cast them.
|
||||||
|
* @return the list of {@link ParsedTask} objects
|
||||||
|
*/
|
||||||
|
private List<ParsedTask> convertTasks(List<LoggedTask> tasks) {
|
||||||
|
List<ParsedTask> result = new ArrayList<ParsedTask>();
|
||||||
|
|
||||||
|
for (LoggedTask t : tasks) {
|
||||||
|
if (t instanceof ParsedTask) {
|
||||||
|
result.add((ParsedTask)t);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Unexpected type of tasks in the list...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Dump the extra info of ParsedJob */
|
||||||
|
void dumpParsedJob() {
|
||||||
|
LOG.info("ParsedJob details:" + obtainTotalCounters() + ";"
|
||||||
|
+ obtainMapCounters() + ";" + obtainReduceCounters()
|
||||||
|
+ "\n" + obtainJobConfpath() + "\n" + obtainJobAcls()
|
||||||
|
+ ";Q=" + (getQueue() == null ? "null" : getQueue().getValue()));
|
||||||
|
List<ParsedTask> maps = obtainMapTasks();
|
||||||
|
for (ParsedTask task : maps) {
|
||||||
|
task.dumpParsedTask();
|
||||||
|
}
|
||||||
|
List<ParsedTask> reduces = obtainReduceTasks();
|
||||||
|
for (ParsedTask task : reduces) {
|
||||||
|
task.dumpParsedTask();
|
||||||
|
}
|
||||||
|
List<ParsedTask> others = obtainOtherTasks();
|
||||||
|
for (ParsedTask task : others) {
|
||||||
|
task.dumpParsedTask();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools.rumen;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrapper class around {@link LoggedTask}. This provides also the
|
||||||
|
* extra information about the task obtained from job history which is not
|
||||||
|
* written to the JSON trace file.
|
||||||
|
*/
|
||||||
|
public class ParsedTask extends LoggedTask {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ParsedTask.class);
|
||||||
|
|
||||||
|
private String diagnosticInfo;
|
||||||
|
private String failedDueToAttempt;
|
||||||
|
private Map<String, Long> countersMap = new HashMap<String, Long>();
|
||||||
|
|
||||||
|
ParsedTask() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incorporateCounters(JhCounters counters) {
|
||||||
|
Map<String, Long> countersMap =
|
||||||
|
JobHistoryUtils.extractCounters(counters);
|
||||||
|
putCounters(countersMap);
|
||||||
|
|
||||||
|
super.incorporateCounters(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the task counters */
|
||||||
|
public void putCounters(Map<String, Long> counters) {
|
||||||
|
this.countersMap = counters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the task counters
|
||||||
|
*/
|
||||||
|
public Map<String, Long> obtainCounters() {
|
||||||
|
return countersMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the task diagnostic-info */
|
||||||
|
public void putDiagnosticInfo(String msg) {
|
||||||
|
diagnosticInfo = msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the diagnostic-info of this task.
|
||||||
|
* If the task is successful, returns null.
|
||||||
|
*/
|
||||||
|
public String obtainDiagnosticInfo() {
|
||||||
|
return diagnosticInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the failed-due-to-attemptId info of this task.
|
||||||
|
*/
|
||||||
|
public void putFailedDueToAttemptId(String attempt) {
|
||||||
|
failedDueToAttempt = attempt;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the failed-due-to-attemptId info of this task.
|
||||||
|
* If the task is successful, returns null.
|
||||||
|
*/
|
||||||
|
public String obtainFailedDueToAttemptId() {
|
||||||
|
return failedDueToAttempt;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ParsedTaskAttempt> obtainTaskAttempts() {
|
||||||
|
List<LoggedTaskAttempt> attempts = getAttempts();
|
||||||
|
return convertTaskAttempts(attempts);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ParsedTaskAttempt> convertTaskAttempts(
|
||||||
|
List<LoggedTaskAttempt> attempts) {
|
||||||
|
List<ParsedTaskAttempt> result = new ArrayList<ParsedTaskAttempt>();
|
||||||
|
|
||||||
|
for (LoggedTaskAttempt t : attempts) {
|
||||||
|
if (t instanceof ParsedTaskAttempt) {
|
||||||
|
result.add((ParsedTaskAttempt)t);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Unexpected type of taskAttempts in the list...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Dump the extra info of ParsedTask */
|
||||||
|
void dumpParsedTask() {
|
||||||
|
LOG.info("ParsedTask details:" + obtainCounters()
|
||||||
|
+ "\n" + obtainFailedDueToAttemptId()
|
||||||
|
+ "\nPreferred Locations are:");
|
||||||
|
List<LoggedLocation> loc = getPreferredLocations();
|
||||||
|
for (LoggedLocation l : loc) {
|
||||||
|
LOG.info(l.getLayers() + ";" + l.toString());
|
||||||
|
}
|
||||||
|
List<ParsedTaskAttempt> attempts = obtainTaskAttempts();
|
||||||
|
for (ParsedTaskAttempt attempt : attempts) {
|
||||||
|
attempt.dumpParsedTaskAttempt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.tools.rumen;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrapper class around {@link LoggedTaskAttempt}. This provides
|
||||||
|
* also the extra information about the task attempt obtained from
|
||||||
|
* job history which is not written to the JSON trace file.
|
||||||
|
*/
|
||||||
|
public class ParsedTaskAttempt extends LoggedTaskAttempt {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ParsedTaskAttempt.class);
|
||||||
|
|
||||||
|
private String diagnosticInfo;
|
||||||
|
private String trackerName;
|
||||||
|
private Integer httpPort, shufflePort;
|
||||||
|
private Map<String, Long> countersMap = new HashMap<String, Long>();
|
||||||
|
|
||||||
|
ParsedTaskAttempt() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** incorporate event counters */
|
||||||
|
public void incorporateCounters(JhCounters counters) {
|
||||||
|
|
||||||
|
Map<String, Long> countersMap =
|
||||||
|
JobHistoryUtils.extractCounters(counters);
|
||||||
|
putCounters(countersMap);
|
||||||
|
|
||||||
|
super.incorporateCounters(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the task attempt counters */
|
||||||
|
public void putCounters(Map<String, Long> counters) {
|
||||||
|
this.countersMap = counters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the task attempt counters
|
||||||
|
*/
|
||||||
|
public Map<String, Long> obtainCounters() {
|
||||||
|
return countersMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set the task attempt diagnostic-info */
|
||||||
|
public void putDiagnosticInfo(String msg) {
|
||||||
|
diagnosticInfo = msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the diagnostic-info of this task attempt.
|
||||||
|
* If the attempt is successful, returns null.
|
||||||
|
*/
|
||||||
|
public String obtainDiagnosticInfo() {
|
||||||
|
return diagnosticInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
void putTrackerName(String trackerName) {
|
||||||
|
this.trackerName = trackerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String obtainTrackerName() {
|
||||||
|
return trackerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
void putHttpPort(int port) {
|
||||||
|
httpPort = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return http port if set. Returns null otherwise.
|
||||||
|
*/
|
||||||
|
public Integer obtainHttpPort() {
|
||||||
|
return httpPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
void putShufflePort(int port) {
|
||||||
|
shufflePort = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return shuffle port if set. Returns null otherwise.
|
||||||
|
*/
|
||||||
|
public Integer obtainShufflePort() {
|
||||||
|
return shufflePort;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Dump the extra info of ParsedTaskAttempt */
|
||||||
|
void dumpParsedTaskAttempt() {
|
||||||
|
LOG.info("ParsedTaskAttempt details:" + obtainCounters()
|
||||||
|
+ ";DiagnosticInfo=" + obtainDiagnosticInfo() + "\n"
|
||||||
|
+ obtainTrackerName() + ";" + obtainHttpPort() + ";"
|
||||||
|
+ obtainShufflePort() + ";rack=" + getHostName().getRackName()
|
||||||
|
+ ";host=" + getHostName().getHostName());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue