MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server. Contributed by Robert Kanter.
This commit is contained in:
parent
f6b963fdfc
commit
6b2f11b54b
|
@ -242,6 +242,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server.
|
||||||
|
(Robert Kanter via zjshen)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-5971. Move the default options for distcp -p to
|
MAPREDUCE-5971. Move the default options for distcp -p to
|
||||||
|
|
|
@ -79,6 +79,12 @@
|
||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -27,7 +27,12 @@ public class JobHistoryEvent extends AbstractEvent<EventType>{
|
||||||
private final HistoryEvent historyEvent;
|
private final HistoryEvent historyEvent;
|
||||||
|
|
||||||
public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
|
public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
|
||||||
super(historyEvent.getEventType());
|
this(jobID, historyEvent, System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent,
|
||||||
|
long timestamp) {
|
||||||
|
super(historyEvent.getEventType(), timestamp);
|
||||||
this.jobID = jobID;
|
this.jobID = jobID;
|
||||||
this.historyEvent = historyEvent;
|
this.historyEvent = historyEvent;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,9 @@ import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.TaskStatus;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
@ -57,8 +59,16 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.node.ArrayNode;
|
||||||
|
import org.codehaus.jackson.node.ObjectNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The job history events get routed to this class. This class writes the Job
|
* The job history events get routed to this class. This class writes the Job
|
||||||
|
@ -108,6 +118,11 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
// should job completion be force when the AM shuts down?
|
// should job completion be force when the AM shuts down?
|
||||||
protected volatile boolean forceJobCompletion = false;
|
protected volatile boolean forceJobCompletion = false;
|
||||||
|
|
||||||
|
protected TimelineClient timelineClient;
|
||||||
|
|
||||||
|
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
|
||||||
|
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
|
||||||
|
|
||||||
public JobHistoryEventHandler(AppContext context, int startCount) {
|
public JobHistoryEventHandler(AppContext context, int startCount) {
|
||||||
super("JobHistoryEventHandler");
|
super("JobHistoryEventHandler");
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
@ -226,6 +241,9 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
||||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
||||||
|
|
||||||
|
timelineClient = TimelineClient.createTimelineClient();
|
||||||
|
timelineClient.init(conf);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,6 +268,7 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
|
timelineClient.start();
|
||||||
eventHandlingThread = new Thread(new Runnable() {
|
eventHandlingThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -372,6 +391,9 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
LOG.info("Exception while closing file " + e.getMessage());
|
LOG.info("Exception while closing file " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (timelineClient != null) {
|
||||||
|
timelineClient.stop();
|
||||||
|
}
|
||||||
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
|
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
@ -515,6 +537,7 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
// For all events
|
// For all events
|
||||||
// (1) Write it out
|
// (1) Write it out
|
||||||
// (2) Process it for JobSummary
|
// (2) Process it for JobSummary
|
||||||
|
// (3) Process it for ATS
|
||||||
MetaInfo mi = fileMap.get(event.getJobID());
|
MetaInfo mi = fileMap.get(event.getJobID());
|
||||||
try {
|
try {
|
||||||
HistoryEvent historyEvent = event.getHistoryEvent();
|
HistoryEvent historyEvent = event.getHistoryEvent();
|
||||||
|
@ -523,6 +546,8 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
}
|
}
|
||||||
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
||||||
event.getJobID());
|
event.getJobID());
|
||||||
|
processEventForTimelineServer(historyEvent, event.getJobID(),
|
||||||
|
event.getTimestamp());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("In HistoryEventHandler "
|
LOG.debug("In HistoryEventHandler "
|
||||||
+ event.getHistoryEvent().getEventType());
|
+ event.getHistoryEvent().getEventType());
|
||||||
|
@ -667,6 +692,319 @@ public class JobHistoryEventHandler extends AbstractService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
|
||||||
|
long timestamp) {
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setEventType(event.getEventType().name().toUpperCase());
|
||||||
|
tEvent.setTimestamp(timestamp);
|
||||||
|
TimelineEntity tEntity = new TimelineEntity();
|
||||||
|
|
||||||
|
switch (event.getEventType()) {
|
||||||
|
case JOB_SUBMITTED:
|
||||||
|
JobSubmittedEvent jse =
|
||||||
|
(JobSubmittedEvent) event;
|
||||||
|
tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime());
|
||||||
|
tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName());
|
||||||
|
tEvent.addEventInfo("JOB_NAME", jse.getJobName());
|
||||||
|
tEvent.addEventInfo("USER_NAME", jse.getUserName());
|
||||||
|
tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath());
|
||||||
|
tEvent.addEventInfo("ACLS", jse.getJobAcls());
|
||||||
|
tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName());
|
||||||
|
tEvent.addEventInfo("WORKLFOW_ID", jse.getWorkflowId());
|
||||||
|
tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName());
|
||||||
|
tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName());
|
||||||
|
tEvent.addEventInfo("WORKFLOW_ADJACENCIES",
|
||||||
|
jse.getWorkflowAdjacencies());
|
||||||
|
tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_STATUS_CHANGED:
|
||||||
|
JobStatusChangedEvent jsce = (JobStatusChangedEvent) event;
|
||||||
|
tEvent.addEventInfo("STATUS", jsce.getStatus());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_INFO_CHANGED:
|
||||||
|
JobInfoChangeEvent jice = (JobInfoChangeEvent) event;
|
||||||
|
tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime());
|
||||||
|
tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_INITED:
|
||||||
|
JobInitedEvent jie = (JobInitedEvent) event;
|
||||||
|
tEvent.addEventInfo("START_TIME", jie.getLaunchTime());
|
||||||
|
tEvent.addEventInfo("STATUS", jie.getStatus());
|
||||||
|
tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps());
|
||||||
|
tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces());
|
||||||
|
tEvent.addEventInfo("UBERIZED", jie.getUberized());
|
||||||
|
tEntity.setStartTime(jie.getLaunchTime());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_PRIORITY_CHANGED:
|
||||||
|
JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event;
|
||||||
|
tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_QUEUE_CHANGED:
|
||||||
|
JobQueueChangeEvent jqe = (JobQueueChangeEvent) event;
|
||||||
|
tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_FAILED:
|
||||||
|
case JOB_KILLED:
|
||||||
|
case JOB_ERROR:
|
||||||
|
JobUnsuccessfulCompletionEvent juce =
|
||||||
|
(JobUnsuccessfulCompletionEvent) event;
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
|
||||||
|
tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps());
|
||||||
|
tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces());
|
||||||
|
tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
|
||||||
|
tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
|
||||||
|
tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps());
|
||||||
|
tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case JOB_FINISHED:
|
||||||
|
JobFinishedEvent jfe = (JobFinishedEvent) event;
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
|
||||||
|
tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps());
|
||||||
|
tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces());
|
||||||
|
tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
|
||||||
|
tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
|
||||||
|
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
|
||||||
|
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
|
||||||
|
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
|
||||||
|
countersToJSON(jfe.getTotalCounters()));
|
||||||
|
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
|
||||||
|
countersToJSON(jfe.getReduceCounters()));
|
||||||
|
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
|
||||||
|
countersToJSON(jfe.getTotalCounters()));
|
||||||
|
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
case TASK_STARTED:
|
||||||
|
TaskStartedEvent tse = (TaskStartedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("START_TIME", tse.getStartTime());
|
||||||
|
tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tse.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case TASK_FAILED:
|
||||||
|
TaskFailedEvent tfe = (TaskFailedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString());
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime());
|
||||||
|
tEvent.addEventInfo("ERROR", tfe.getError());
|
||||||
|
tEvent.addEventInfo("FAILED_ATTEMPT_ID",
|
||||||
|
tfe.getFailedAttemptID() == null ?
|
||||||
|
"" : tfe.getFailedAttemptID().toString());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(tfe.getCounters()));
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tfe.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case TASK_UPDATED:
|
||||||
|
TaskUpdatedEvent tue = (TaskUpdatedEvent) event;
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tue.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case TASK_FINISHED:
|
||||||
|
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(tfe2.getCounters()));
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
|
||||||
|
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
|
||||||
|
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
|
||||||
|
tfe2.getSuccessfulTaskAttemptId() == null ?
|
||||||
|
"" : tfe2.getSuccessfulTaskAttemptId().toString());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tfe2.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case MAP_ATTEMPT_STARTED:
|
||||||
|
case CLEANUP_ATTEMPT_STARTED:
|
||||||
|
case REDUCE_ATTEMPT_STARTED:
|
||||||
|
case SETUP_ATTEMPT_STARTED:
|
||||||
|
TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("TASK_ATTEMPT_ID",
|
||||||
|
tase.getTaskAttemptId().toString() == null ?
|
||||||
|
"" : tase.getTaskAttemptId().toString());
|
||||||
|
tEvent.addEventInfo("START_TIME", tase.getStartTime());
|
||||||
|
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
|
||||||
|
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
|
||||||
|
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
|
||||||
|
"" : tase.getContainerId().toString());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tase.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case MAP_ATTEMPT_FAILED:
|
||||||
|
case CLEANUP_ATTEMPT_FAILED:
|
||||||
|
case REDUCE_ATTEMPT_FAILED:
|
||||||
|
case SETUP_ATTEMPT_FAILED:
|
||||||
|
case MAP_ATTEMPT_KILLED:
|
||||||
|
case CLEANUP_ATTEMPT_KILLED:
|
||||||
|
case REDUCE_ATTEMPT_KILLED:
|
||||||
|
case SETUP_ATTEMPT_KILLED:
|
||||||
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
|
(TaskAttemptUnsuccessfulCompletionEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("TASK_ATTEMPT_ID",
|
||||||
|
tauce.getTaskAttemptId() == null ?
|
||||||
|
"" : tauce.getTaskAttemptId().toString());
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime());
|
||||||
|
tEvent.addEventInfo("ERROR", tauce.getError());
|
||||||
|
tEvent.addEventInfo("STATUS", tauce.getTaskStatus());
|
||||||
|
tEvent.addEventInfo("HOSTNAME", tauce.getHostname());
|
||||||
|
tEvent.addEventInfo("PORT", tauce.getPort());
|
||||||
|
tEvent.addEventInfo("RACK_NAME", tauce.getRackName());
|
||||||
|
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime());
|
||||||
|
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
|
||||||
|
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(tauce.getCounters()));
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tauce.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case MAP_ATTEMPT_FINISHED:
|
||||||
|
MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime());
|
||||||
|
tEvent.addEventInfo("STATUS", mafe.getTaskStatus());
|
||||||
|
tEvent.addEventInfo("STATE", mafe.getState());
|
||||||
|
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(mafe.getCounters()));
|
||||||
|
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
|
||||||
|
tEvent.addEventInfo("PORT", mafe.getPort());
|
||||||
|
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
|
||||||
|
tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ?
|
||||||
|
"" : mafe.getAttemptId().toString());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(mafe.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case REDUCE_ATTEMPT_FINISHED:
|
||||||
|
ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ?
|
||||||
|
"" : rafe.getAttemptId().toString());
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime());
|
||||||
|
tEvent.addEventInfo("STATUS", rafe.getTaskStatus());
|
||||||
|
tEvent.addEventInfo("STATE", rafe.getState());
|
||||||
|
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
|
||||||
|
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(rafe.getCounters()));
|
||||||
|
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
|
||||||
|
tEvent.addEventInfo("PORT", rafe.getPort());
|
||||||
|
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(rafe.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case SETUP_ATTEMPT_FINISHED:
|
||||||
|
case CLEANUP_ATTEMPT_FINISHED:
|
||||||
|
TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event;
|
||||||
|
tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString());
|
||||||
|
tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ?
|
||||||
|
"" : tafe.getAttemptId().toString());
|
||||||
|
tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime());
|
||||||
|
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
|
||||||
|
tEvent.addEventInfo("STATE", tafe.getState());
|
||||||
|
tEvent.addEventInfo("COUNTERS_GROUPS",
|
||||||
|
countersToJSON(tafe.getCounters()));
|
||||||
|
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(tafe.getTaskId().toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||||
|
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
|
||||||
|
break;
|
||||||
|
case AM_STARTED:
|
||||||
|
AMStartedEvent ase = (AMStartedEvent) event;
|
||||||
|
tEvent.addEventInfo("APPLICATION_ATTEMPT_ID",
|
||||||
|
ase.getAppAttemptId() == null ?
|
||||||
|
"" : ase.getAppAttemptId().toString());
|
||||||
|
tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ?
|
||||||
|
"" : ase.getContainerId().toString());
|
||||||
|
tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost());
|
||||||
|
tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort());
|
||||||
|
tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
|
||||||
|
ase.getNodeManagerHttpPort());
|
||||||
|
tEvent.addEventInfo("START_TIME", ase.getStartTime());
|
||||||
|
tEntity.addEvent(tEvent);
|
||||||
|
tEntity.setEntityId(jobId.toString());
|
||||||
|
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
timelineClient.putEntities(tEntity);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
|
||||||
|
+ "Server", ex);
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
|
||||||
|
+ "Server", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public JsonNode countersToJSON(Counters counters) {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
ArrayNode nodes = mapper.createArrayNode();
|
||||||
|
if (counters != null) {
|
||||||
|
for (CounterGroup counterGroup : counters) {
|
||||||
|
ObjectNode groupNode = nodes.addObject();
|
||||||
|
groupNode.put("NAME", counterGroup.getName());
|
||||||
|
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
|
||||||
|
ArrayNode countersNode = groupNode.putArray("COUNTERS");
|
||||||
|
for (Counter counter : counterGroup) {
|
||||||
|
ObjectNode counterNode = countersNode.addObject();
|
||||||
|
counterNode.put("NAME", counter.getName());
|
||||||
|
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
|
||||||
|
counterNode.put("VALUE", counter.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
|
||||||
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
|
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
|
||||||
|
|
||||||
Counter slotMillisMapCounter = allCounters
|
Counter slotMillisMapCounter = allCounters
|
||||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
@ -42,9 +43,12 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
|
@ -55,14 +59,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
@ -126,7 +138,7 @@ public class TestJobHistoryEventHandler {
|
||||||
|
|
||||||
// First completion event, but min-queue-size for batching flushes is 10
|
// First completion event, but min-queue-size for batching flushes is 10
|
||||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, null, 0, TaskType.MAP, "", null)));
|
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
|
||||||
verify(mockWriter).flush();
|
verify(mockWriter).flush();
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -162,7 +174,7 @@ public class TestJobHistoryEventHandler {
|
||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, null, 0, TaskType.MAP, "", null)));
|
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
handleNextNEvents(jheh, 9);
|
handleNextNEvents(jheh, 9);
|
||||||
|
@ -207,7 +219,7 @@ public class TestJobHistoryEventHandler {
|
||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, null, 0, TaskType.MAP, "", null)));
|
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
handleNextNEvents(jheh, 9);
|
handleNextNEvents(jheh, 9);
|
||||||
|
@ -248,7 +260,7 @@ public class TestJobHistoryEventHandler {
|
||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, null, 0, TaskType.MAP, "", null)));
|
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
||||||
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
|
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
|
||||||
|
@ -427,6 +439,231 @@ public class TestJobHistoryEventHandler {
|
||||||
pathStr);
|
pathStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Have JobHistoryEventHandler handle some events and make sure they get
|
||||||
|
// stored to the Timeline store
|
||||||
|
@Test (timeout=50000)
|
||||||
|
public void testTimelineEventHandling() throws Exception {
|
||||||
|
TestParams t = new TestParams(false);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
JHEvenHandlerForTest jheh = new JHEvenHandlerForTest(t.mockAppContext, 0);
|
||||||
|
jheh.init(conf);
|
||||||
|
MiniYARNCluster yarnCluster = null;
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
yarnCluster = new MiniYARNCluster(
|
||||||
|
TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1, true);
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
|
jheh.start();
|
||||||
|
TimelineStore ts = yarnCluster.getApplicationHistoryServer()
|
||||||
|
.getTimelineStore();
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
||||||
|
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
|
||||||
|
currentTime - 10));
|
||||||
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
|
null, null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(1, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(currentTime - 10,
|
||||||
|
tEntity.getEvents().get(0).getTimestamp());
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new JobSubmittedEvent(TypeConverter.fromYarn(t.jobId), "name",
|
||||||
|
"user", 200, "/foo/job.xml",
|
||||||
|
new HashMap<JobACL, AccessControlList>(), "default"),
|
||||||
|
currentTime + 10));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(2, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventType());
|
||||||
|
Assert.assertEquals(currentTime + 10,
|
||||||
|
tEntity.getEvents().get(0).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 10,
|
||||||
|
tEntity.getEvents().get(1).getTimestamp());
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
|
||||||
|
currentTime - 20));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(3, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
||||||
|
tEntity.getEvents().get(2).getEventType());
|
||||||
|
Assert.assertEquals(currentTime + 10,
|
||||||
|
tEntity.getEvents().get(0).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 10,
|
||||||
|
tEntity.getEvents().get(1).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 20,
|
||||||
|
tEntity.getEvents().get(2).getTimestamp());
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
|
||||||
|
0, new Counters(), new Counters(), new Counters()), currentTime));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(4, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_FINISHED.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(2).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
||||||
|
tEntity.getEvents().get(3).getEventType());
|
||||||
|
Assert.assertEquals(currentTime + 10,
|
||||||
|
tEntity.getEvents().get(0).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime,
|
||||||
|
tEntity.getEvents().get(1).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 10,
|
||||||
|
tEntity.getEvents().get(2).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 20,
|
||||||
|
tEntity.getEvents().get(3).getTimestamp());
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
|
||||||
|
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(5, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.JOB_KILLED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_FINISHED.toString(),
|
||||||
|
tEntity.getEvents().get(2).getEventType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(3).getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
|
||||||
|
tEntity.getEvents().get(4).getEventType());
|
||||||
|
Assert.assertEquals(currentTime + 20,
|
||||||
|
tEntity.getEvents().get(0).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime + 10,
|
||||||
|
tEntity.getEvents().get(1).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime,
|
||||||
|
tEntity.getEvents().get(2).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 10,
|
||||||
|
tEntity.getEvents().get(3).getTimestamp());
|
||||||
|
Assert.assertEquals(currentTime - 20,
|
||||||
|
tEntity.getEvents().get(4).getTimestamp());
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(1, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.TASK_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
Assert.assertEquals(TaskType.MAP.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
|
||||||
|
|
||||||
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
|
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
|
||||||
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
||||||
|
null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals(2, tEntity.getEvents().size());
|
||||||
|
Assert.assertEquals(EventType.TASK_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventType());
|
||||||
|
Assert.assertEquals(TaskType.REDUCE.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
|
||||||
|
Assert.assertEquals(TaskType.MAP.toString(),
|
||||||
|
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
|
||||||
|
} finally {
|
||||||
|
if (yarnCluster != null) {
|
||||||
|
yarnCluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=50000)
|
||||||
|
public void testCountersToJSON() throws Exception {
|
||||||
|
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
|
||||||
|
Counters counters = new Counters();
|
||||||
|
CounterGroup group1 = counters.addGroup("DOCTORS",
|
||||||
|
"Incarnations of the Doctor");
|
||||||
|
group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
|
||||||
|
group1.addCounter("MATT_SMITH", "Matt Smith", 11);
|
||||||
|
group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
|
||||||
|
CounterGroup group2 = counters.addGroup("COMPANIONS",
|
||||||
|
"Companions of the Doctor");
|
||||||
|
group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
|
||||||
|
group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
|
||||||
|
group2.addCounter("AMY_POND", "Amy Pond", 4);
|
||||||
|
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
|
||||||
|
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
|
||||||
|
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
|
||||||
|
JsonNode jsonNode = jheh.countersToJSON(counters);
|
||||||
|
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
||||||
|
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
|
||||||
|
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
|
||||||
|
+ ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
|
||||||
|
+ "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
|
||||||
|
+ "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
|
||||||
|
+ "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
|
||||||
|
+ "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
|
||||||
|
+ "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
|
||||||
|
+ "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
|
||||||
|
+ ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
|
||||||
|
+ "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
|
||||||
|
+ "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
|
||||||
|
+ "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
|
||||||
|
+ "\"VALUE\":12}]}]";
|
||||||
|
Assert.assertEquals(expected, jsonStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=50000)
|
||||||
|
public void testCountersToJSONEmpty() throws Exception {
|
||||||
|
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
|
||||||
|
Counters counters = null;
|
||||||
|
JsonNode jsonNode = jheh.countersToJSON(counters);
|
||||||
|
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
||||||
|
String expected = "[]";
|
||||||
|
Assert.assertEquals(expected, jsonStr);
|
||||||
|
|
||||||
|
counters = new Counters();
|
||||||
|
jsonNode = jheh.countersToJSON(counters);
|
||||||
|
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
||||||
|
expected = "[]";
|
||||||
|
Assert.assertEquals(expected, jsonStr);
|
||||||
|
|
||||||
|
counters.addGroup("DOCTORS", "Incarnations of the Doctor");
|
||||||
|
jsonNode = jheh.countersToJSON(counters);
|
||||||
|
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
|
||||||
|
expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
|
||||||
|
+ "Doctor\",\"COUNTERS\":[]}]";
|
||||||
|
Assert.assertEquals(expected, jsonStr);
|
||||||
|
}
|
||||||
|
|
||||||
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
||||||
jheh.handle(event);
|
jheh.handle(event);
|
||||||
}
|
}
|
||||||
|
@ -480,6 +717,7 @@ public class TestJobHistoryEventHandler {
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
||||||
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
|
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
|
||||||
|
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 0);
|
||||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
AppContext mockAppContext;
|
AppContext mockAppContext;
|
||||||
|
|
||||||
|
@ -557,11 +795,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
private boolean mockHistoryProcessing = true;
|
private boolean mockHistoryProcessing = true;
|
||||||
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
||||||
super(context, startCount);
|
super(context, startCount);
|
||||||
|
JobHistoryEventHandler.fileMap.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
|
public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
|
||||||
super(context, startCount);
|
super(context, startCount);
|
||||||
this.mockHistoryProcessing = mockHistoryProcessing;
|
this.mockHistoryProcessing = mockHistoryProcessing;
|
||||||
|
JobHistoryEventHandler.fileMap.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestMRTimelineEventHandling {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMRTimelineEventHandling() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
MiniMRYarnCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniMRYarnCluster(
|
||||||
|
TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
|
||||||
|
cluster.init(conf);
|
||||||
|
cluster.start();
|
||||||
|
TimelineStore ts = cluster.getApplicationHistoryServer()
|
||||||
|
.getTimelineStore();
|
||||||
|
|
||||||
|
Path inDir = new Path("input");
|
||||||
|
Path outDir = new Path("output");
|
||||||
|
RunningJob job =
|
||||||
|
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||||
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
|
job.getJobStatus().getState().getValue());
|
||||||
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
|
null, null, null, null, null, null);
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(tEntity.getEvents().size() - 1)
|
||||||
|
.getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_FINISHED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
|
||||||
|
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
|
||||||
|
Assert.assertEquals(JobStatus.FAILED,
|
||||||
|
job.getJobStatus().getState().getValue());
|
||||||
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||||
|
null, null, null);
|
||||||
|
Assert.assertEquals(2, entities.getEntities().size());
|
||||||
|
tEntity = entities.getEntities().get(0);
|
||||||
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
|
||||||
|
Assert.assertEquals(EventType.AM_STARTED.toString(),
|
||||||
|
tEntity.getEvents().get(tEntity.getEvents().size() - 1)
|
||||||
|
.getEventType());
|
||||||
|
Assert.assertEquals(EventType.JOB_FAILED.toString(),
|
||||||
|
tEntity.getEvents().get(0).getEventType());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -72,8 +72,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
public MiniMRYarnCluster(String testName, int noOfNMs) {
|
public MiniMRYarnCluster(String testName, int noOfNMs) {
|
||||||
super(testName, noOfNMs, 4, 4);
|
this(testName, noOfNMs, false);
|
||||||
//TODO: add the history server
|
}
|
||||||
|
|
||||||
|
public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) {
|
||||||
|
super(testName, 1, noOfNMs, 4, 4, enableAHS);
|
||||||
historyServerWrapper = new JobHistoryServerWrapper();
|
historyServerWrapper = new JobHistoryServerWrapper();
|
||||||
addService(historyServerWrapper);
|
addService(historyServerWrapper);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue