diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 84016b907e5..fd82d495c42 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -242,6 +242,9 @@ Release 2.6.0 - UNRELEASED NEW FEATURES + MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server. + (Robert Kanter via zjshen) + IMPROVEMENTS MAPREDUCE-5971. Move the default options for distcp -p to diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml index 2e597d11d66..4f6f837b4df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml @@ -79,6 +79,12 @@ test-jar test + + org.apache.hadoop + hadoop-yarn-server-tests + test-jar + test + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java index e853b1c60eb..4fe35b260c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java @@ -27,7 +27,12 @@ public class JobHistoryEvent extends AbstractEvent{ private final HistoryEvent historyEvent; public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) { - super(historyEvent.getEventType()); + this(jobID, historyEvent, System.currentTimeMillis()); + } + + public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent, + long timestamp) { + super(historyEvent.getEventType(), timestamp); this.jobID = jobID; this.historyEvent = historyEvent; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index c566740aa7a..ebedc1b40f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -41,7 +41,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -57,8 +59,16 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; /** * The job history events get routed to this class. This class writes the Job @@ -108,6 +118,11 @@ public class JobHistoryEventHandler extends AbstractService // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; + protected TimelineClient timelineClient; + + private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; + private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; + public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); this.context = context; @@ -225,7 +240,10 @@ protected void serviceInit(Configuration conf) throws Exception { conf.getInt( MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); - + + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + super.serviceInit(conf); } @@ -250,6 +268,7 @@ private void mkdir(FileSystem fs, Path path, FsPermission fsp) @Override protected void serviceStart() throws Exception { + timelineClient.start(); eventHandlingThread = new Thread(new Runnable() { @Override public void run() { @@ -372,6 +391,9 @@ protected void serviceStop() throws Exception { LOG.info("Exception while closing file " + e.getMessage()); } } + if (timelineClient != null) { + timelineClient.stop(); + } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } @@ -515,6 +537,7 @@ public void handleEvent(JobHistoryEvent event) { // For all events // (1) Write it out // (2) Process it for JobSummary + // (3) Process it for ATS MetaInfo mi = fileMap.get(event.getJobID()); try { HistoryEvent historyEvent = event.getHistoryEvent(); @@ -523,6 +546,8 @@ public void handleEvent(JobHistoryEvent event) { } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " + event.getHistoryEvent().getEventType()); @@ -667,6 +692,319 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary, } } + private void processEventForTimelineServer(HistoryEvent event, JobId jobId, + long timestamp) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(event.getEventType().name().toUpperCase()); + tEvent.setTimestamp(timestamp); + TimelineEntity tEntity = new TimelineEntity(); + + switch (event.getEventType()) { + case JOB_SUBMITTED: + JobSubmittedEvent jse = + (JobSubmittedEvent) event; + tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime()); + tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName()); + tEvent.addEventInfo("JOB_NAME", jse.getJobName()); + tEvent.addEventInfo("USER_NAME", jse.getUserName()); + tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath()); + tEvent.addEventInfo("ACLS", jse.getJobAcls()); + tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName()); + tEvent.addEventInfo("WORKLFOW_ID", jse.getWorkflowId()); + tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName()); + tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName()); + tEvent.addEventInfo("WORKFLOW_ADJACENCIES", + jse.getWorkflowAdjacencies()); + tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_STATUS_CHANGED: + JobStatusChangedEvent jsce = (JobStatusChangedEvent) event; + tEvent.addEventInfo("STATUS", jsce.getStatus()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_INFO_CHANGED: + JobInfoChangeEvent jice = (JobInfoChangeEvent) event; + tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime()); + tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_INITED: + JobInitedEvent jie = (JobInitedEvent) event; + tEvent.addEventInfo("START_TIME", jie.getLaunchTime()); + tEvent.addEventInfo("STATUS", jie.getStatus()); + tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps()); + tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces()); + tEvent.addEventInfo("UBERIZED", jie.getUberized()); + tEntity.setStartTime(jie.getLaunchTime()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_PRIORITY_CHANGED: + JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event; + tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_QUEUE_CHANGED: + JobQueueChangeEvent jqe = (JobQueueChangeEvent) event; + tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_FAILED: + case JOB_KILLED: + case JOB_ERROR: + JobUnsuccessfulCompletionEvent juce = + (JobUnsuccessfulCompletionEvent) event; + tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime()); + tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps()); + tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces()); + tEvent.addEventInfo("JOB_STATUS", juce.getStatus()); + tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics()); + tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps()); + tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case JOB_FINISHED: + JobFinishedEvent jfe = (JobFinishedEvent) event; + tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime()); + tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps()); + tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces()); + tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps()); + tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces()); + tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps()); + tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces()); + tEvent.addEventInfo("MAP_COUNTERS_GROUPS", + countersToJSON(jfe.getTotalCounters())); + tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", + countersToJSON(jfe.getReduceCounters())); + tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS", + countersToJSON(jfe.getTotalCounters())); + tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + case TASK_STARTED: + TaskStartedEvent tse = (TaskStartedEvent) event; + tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString()); + tEvent.addEventInfo("START_TIME", tse.getStartTime()); + tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tse.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case TASK_FAILED: + TaskFailedEvent tfe = (TaskFailedEvent) event; + tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString()); + tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString()); + tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime()); + tEvent.addEventInfo("ERROR", tfe.getError()); + tEvent.addEventInfo("FAILED_ATTEMPT_ID", + tfe.getFailedAttemptID() == null ? + "" : tfe.getFailedAttemptID().toString()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(tfe.getCounters())); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tfe.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case TASK_UPDATED: + TaskUpdatedEvent tue = (TaskUpdatedEvent) event; + tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tue.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case TASK_FINISHED: + TaskFinishedEvent tfe2 = (TaskFinishedEvent) event; + tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(tfe2.getCounters())); + tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); + tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); + tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", + tfe2.getSuccessfulTaskAttemptId() == null ? + "" : tfe2.getSuccessfulTaskAttemptId().toString()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tfe2.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case MAP_ATTEMPT_STARTED: + case CLEANUP_ATTEMPT_STARTED: + case REDUCE_ATTEMPT_STARTED: + case SETUP_ATTEMPT_STARTED: + TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event; + tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString()); + tEvent.addEventInfo("TASK_ATTEMPT_ID", + tase.getTaskAttemptId().toString() == null ? + "" : tase.getTaskAttemptId().toString()); + tEvent.addEventInfo("START_TIME", tase.getStartTime()); + tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort()); + tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName()); + tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString()); + tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort()); + tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ? + "" : tase.getContainerId().toString()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tase.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case MAP_ATTEMPT_FAILED: + case CLEANUP_ATTEMPT_FAILED: + case REDUCE_ATTEMPT_FAILED: + case SETUP_ATTEMPT_FAILED: + case MAP_ATTEMPT_KILLED: + case CLEANUP_ATTEMPT_KILLED: + case REDUCE_ATTEMPT_KILLED: + case SETUP_ATTEMPT_KILLED: + TaskAttemptUnsuccessfulCompletionEvent tauce = + (TaskAttemptUnsuccessfulCompletionEvent) event; + tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString()); + tEvent.addEventInfo("TASK_ATTEMPT_ID", + tauce.getTaskAttemptId() == null ? + "" : tauce.getTaskAttemptId().toString()); + tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime()); + tEvent.addEventInfo("ERROR", tauce.getError()); + tEvent.addEventInfo("STATUS", tauce.getTaskStatus()); + tEvent.addEventInfo("HOSTNAME", tauce.getHostname()); + tEvent.addEventInfo("PORT", tauce.getPort()); + tEvent.addEventInfo("RACK_NAME", tauce.getRackName()); + tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime()); + tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime()); + tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(tauce.getCounters())); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tauce.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case MAP_ATTEMPT_FINISHED: + MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event; + tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString()); + tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime()); + tEvent.addEventInfo("STATUS", mafe.getTaskStatus()); + tEvent.addEventInfo("STATE", mafe.getState()); + tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(mafe.getCounters())); + tEvent.addEventInfo("HOSTNAME", mafe.getHostname()); + tEvent.addEventInfo("PORT", mafe.getPort()); + tEvent.addEventInfo("RACK_NAME", mafe.getRackName()); + tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ? + "" : mafe.getAttemptId().toString()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(mafe.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case REDUCE_ATTEMPT_FINISHED: + ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event; + tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString()); + tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ? + "" : rafe.getAttemptId().toString()); + tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime()); + tEvent.addEventInfo("STATUS", rafe.getTaskStatus()); + tEvent.addEventInfo("STATE", rafe.getState()); + tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime()); + tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(rafe.getCounters())); + tEvent.addEventInfo("HOSTNAME", rafe.getHostname()); + tEvent.addEventInfo("PORT", rafe.getPort()); + tEvent.addEventInfo("RACK_NAME", rafe.getRackName()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(rafe.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case SETUP_ATTEMPT_FINISHED: + case CLEANUP_ATTEMPT_FINISHED: + TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event; + tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString()); + tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ? + "" : tafe.getAttemptId().toString()); + tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime()); + tEvent.addEventInfo("STATUS", tafe.getTaskStatus()); + tEvent.addEventInfo("STATE", tafe.getState()); + tEvent.addEventInfo("COUNTERS_GROUPS", + countersToJSON(tafe.getCounters())); + tEvent.addEventInfo("HOSTNAME", tafe.getHostname()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(tafe.getTaskId().toString()); + tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); + tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString()); + break; + case AM_STARTED: + AMStartedEvent ase = (AMStartedEvent) event; + tEvent.addEventInfo("APPLICATION_ATTEMPT_ID", + ase.getAppAttemptId() == null ? + "" : ase.getAppAttemptId().toString()); + tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ? + "" : ase.getContainerId().toString()); + tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost()); + tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort()); + tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT", + ase.getNodeManagerHttpPort()); + tEvent.addEventInfo("START_TIME", ase.getStartTime()); + tEntity.addEvent(tEvent); + tEntity.setEntityId(jobId.toString()); + tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); + break; + default: + break; + } + + try { + timelineClient.putEntities(tEntity); + } catch (IOException ex) { + LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline" + + "Server", ex); + } catch (YarnException ex) { + LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline" + + "Server", ex); + } + } + + @Private + public JsonNode countersToJSON(Counters counters) { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode nodes = mapper.createArrayNode(); + if (counters != null) { + for (CounterGroup counterGroup : counters) { + ObjectNode groupNode = nodes.addObject(); + groupNode.put("NAME", counterGroup.getName()); + groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); + ArrayNode countersNode = groupNode.putArray("COUNTERS"); + for (Counter counter : counterGroup) { + ObjectNode counterNode = countersNode.addObject(); + counterNode.put("NAME", counter.getName()); + counterNode.put("DISPLAY_NAME", counter.getDisplayName()); + counterNode.put("VALUE", counter.getValue()); + } + } + } + return nodes; + } + private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { Counter slotMillisMapCounter = allCounters diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 7539e73ee63..48042d36212 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.HashMap; import org.junit.Assert; @@ -42,9 +43,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; @@ -55,14 +59,22 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.After; import org.junit.AfterClass; import static org.junit.Assert.assertFalse; import org.junit.BeforeClass; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; import org.mockito.Mockito; @@ -126,7 +138,7 @@ public void testFirstFlushOnCompletionEvent() throws Exception { // First completion event, but min-queue-size for batching flushes is 10 handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, null, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); verify(mockWriter).flush(); } finally { @@ -162,7 +174,7 @@ public void testMaxUnflushedCompletionEvents() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, null, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); } handleNextNEvents(jheh, 9); @@ -207,7 +219,7 @@ public void testUnflushedTimer() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, null, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); } handleNextNEvents(jheh, 9); @@ -248,7 +260,7 @@ public void testBatchedFlushJobEndMultiplier() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, null, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); } queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters()))); @@ -427,6 +439,231 @@ public void testGetHistoryIntermediateDoneDirForUser() throws IOException { pathStr); } + // Have JobHistoryEventHandler handle some events and make sure they get + // stored to the Timeline store + @Test (timeout=50000) + public void testTimelineEventHandling() throws Exception { + TestParams t = new TestParams(false); + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + JHEvenHandlerForTest jheh = new JHEvenHandlerForTest(t.mockAppContext, 0); + jheh.init(conf); + MiniYARNCluster yarnCluster = null; + long currentTime = System.currentTimeMillis(); + try { + yarnCluster = new MiniYARNCluster( + TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1, true); + yarnCluster.init(conf); + yarnCluster.start(); + jheh.start(); + TimelineStore ts = yarnCluster.getApplicationHistoryServer() + .getTimelineStore(); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000), + currentTime - 10)); + TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, + null, null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + TimelineEntity tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId()); + Assert.assertEquals(1, tEntity.getEvents().size()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(currentTime - 10, + tEntity.getEvents().get(0).getTimestamp()); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobSubmittedEvent(TypeConverter.fromYarn(t.jobId), "name", + "user", 200, "/foo/job.xml", + new HashMap(), "default"), + currentTime + 10)); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId()); + Assert.assertEquals(2, tEntity.getEvents().size()); + Assert.assertEquals(EventType.JOB_SUBMITTED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(1).getEventType()); + Assert.assertEquals(currentTime + 10, + tEntity.getEvents().get(0).getTimestamp()); + Assert.assertEquals(currentTime - 10, + tEntity.getEvents().get(1).getTimestamp()); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"), + currentTime - 20)); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId()); + Assert.assertEquals(3, tEntity.getEvents().size()); + Assert.assertEquals(EventType.JOB_SUBMITTED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(1).getEventType()); + Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(), + tEntity.getEvents().get(2).getEventType()); + Assert.assertEquals(currentTime + 10, + tEntity.getEvents().get(0).getTimestamp()); + Assert.assertEquals(currentTime - 10, + tEntity.getEvents().get(1).getTimestamp()); + Assert.assertEquals(currentTime - 20, + tEntity.getEvents().get(2).getTimestamp()); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, + 0, new Counters(), new Counters(), new Counters()), currentTime)); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId()); + Assert.assertEquals(4, tEntity.getEvents().size()); + Assert.assertEquals(EventType.JOB_SUBMITTED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(EventType.JOB_FINISHED.toString(), + tEntity.getEvents().get(1).getEventType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(2).getEventType()); + Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(), + tEntity.getEvents().get(3).getEventType()); + Assert.assertEquals(currentTime + 10, + tEntity.getEvents().get(0).getTimestamp()); + Assert.assertEquals(currentTime, + tEntity.getEvents().get(1).getTimestamp()); + Assert.assertEquals(currentTime - 10, + tEntity.getEvents().get(2).getTimestamp()); + Assert.assertEquals(currentTime - 20, + tEntity.getEvents().get(3).getTimestamp()); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), + 0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20)); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId()); + Assert.assertEquals(5, tEntity.getEvents().size()); + Assert.assertEquals(EventType.JOB_KILLED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(EventType.JOB_SUBMITTED.toString(), + tEntity.getEvents().get(1).getEventType()); + Assert.assertEquals(EventType.JOB_FINISHED.toString(), + tEntity.getEvents().get(2).getEventType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(3).getEventType()); + Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(), + tEntity.getEvents().get(4).getEventType()); + Assert.assertEquals(currentTime + 20, + tEntity.getEvents().get(0).getTimestamp()); + Assert.assertEquals(currentTime + 10, + tEntity.getEvents().get(1).getTimestamp()); + Assert.assertEquals(currentTime, + tEntity.getEvents().get(2).getTimestamp()); + Assert.assertEquals(currentTime - 10, + tEntity.getEvents().get(3).getTimestamp()); + Assert.assertEquals(currentTime - 20, + tEntity.getEvents().get(4).getTimestamp()); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new TaskStartedEvent(t.taskID, 0, TaskType.MAP, ""))); + entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId()); + Assert.assertEquals(1, tEntity.getEvents().size()); + Assert.assertEquals(EventType.TASK_STARTED.toString(), + tEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(TaskType.MAP.toString(), + tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE")); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, ""))); + entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, + null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId()); + Assert.assertEquals(2, tEntity.getEvents().size()); + Assert.assertEquals(EventType.TASK_STARTED.toString(), + tEntity.getEvents().get(1).getEventType()); + Assert.assertEquals(TaskType.REDUCE.toString(), + tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE")); + Assert.assertEquals(TaskType.MAP.toString(), + tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE")); + } finally { + if (yarnCluster != null) { + yarnCluster.stop(); + } + } + } + + @Test (timeout=50000) + public void testCountersToJSON() throws Exception { + JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0); + Counters counters = new Counters(); + CounterGroup group1 = counters.addGroup("DOCTORS", + "Incarnations of the Doctor"); + group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12); + group1.addCounter("MATT_SMITH", "Matt Smith", 11); + group1.addCounter("DAVID_TENNANT", "David Tennant", 10); + CounterGroup group2 = counters.addGroup("COMPANIONS", + "Companions of the Doctor"); + group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6); + group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5); + group2.addCounter("AMY_POND", "Amy Pond", 4); + group2.addCounter("MARTHA_JONES", "Martha Jones", 3); + group2.addCounter("DONNA_NOBLE", "Donna Noble", 2); + group2.addCounter("ROSE_TYLER", "Rose Tyler", 1); + JsonNode jsonNode = jheh.countersToJSON(counters); + String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); + String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions " + + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\"" + + ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\"," + + "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":" + + "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2}," + + "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\"," + + "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory " + + "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":" + + "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\"" + + ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":" + + "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10}," + + "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":" + + "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\"," + + "\"VALUE\":12}]}]"; + Assert.assertEquals(expected, jsonStr); + } + + @Test (timeout=50000) + public void testCountersToJSONEmpty() throws Exception { + JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0); + Counters counters = null; + JsonNode jsonNode = jheh.countersToJSON(counters); + String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); + String expected = "[]"; + Assert.assertEquals(expected, jsonStr); + + counters = new Counters(); + jsonNode = jheh.countersToJSON(counters); + jsonStr = new ObjectMapper().writeValueAsString(jsonNode); + expected = "[]"; + Assert.assertEquals(expected, jsonStr); + + counters.addGroup("DOCTORS", "Incarnations of the Doctor"); + jsonNode = jheh.countersToJSON(counters); + jsonStr = new ObjectMapper().writeValueAsString(jsonNode); + expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the " + + "Doctor\",\"COUNTERS\":[]}]"; + Assert.assertEquals(expected, jsonStr); + } + private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) { jheh.handle(event); } @@ -480,6 +717,7 @@ private class TestParams { ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005"); + TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); AppContext mockAppContext; @@ -557,11 +795,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { private boolean mockHistoryProcessing = true; public JHEvenHandlerForTest(AppContext context, int startCount) { super(context, startCount); + JobHistoryEventHandler.fileMap.clear(); } public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) { super(context, startCount); this.mockHistoryProcessing = mockHistoryProcessing; + JobHistoryEventHandler.fileMap.clear(); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java new file mode 100644 index 00000000000..2352818ccc7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; + +import org.junit.Assert; +import org.junit.Test; + +public class TestMRTimelineEventHandling { + + @Test + public void testMRTimelineEventHandling() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + MiniMRYarnCluster cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestJobHistoryEventHandler.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + TimelineStore ts = cluster.getApplicationHistoryServer() + .getTimelineStore(); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + RunningJob job = + UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, + null, null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + TimelineEntity tEntity = entities.getEntities().get(0); + Assert.assertEquals(job.getID().toString(), tEntity.getEntityId()); + Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(tEntity.getEvents().size() - 1) + .getEventType()); + Assert.assertEquals(EventType.JOB_FINISHED.toString(), + tEntity.getEvents().get(0).getEventType()); + + job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.FAILED, + job.getJobStatus().getState().getValue()); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, + null, null, null); + Assert.assertEquals(2, entities.getEntities().size()); + tEntity = entities.getEntities().get(0); + Assert.assertEquals(job.getID().toString(), tEntity.getEntityId()); + Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType()); + Assert.assertEquals(EventType.AM_STARTED.toString(), + tEntity.getEvents().get(tEntity.getEvents().size() - 1) + .getEventType()); + Assert.assertEquals(EventType.JOB_FAILED.toString(), + tEntity.getEvents().get(0).getEventType()); + } finally { + if (cluster != null) { + cluster.stop(); + } + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 81b8b16d631..47b38a1365c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -72,8 +72,11 @@ public MiniMRYarnCluster(String testName) { } public MiniMRYarnCluster(String testName, int noOfNMs) { - super(testName, noOfNMs, 4, 4); - //TODO: add the history server + this(testName, noOfNMs, false); + } + + public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) { + super(testName, 1, noOfNMs, 4, 4, enableAHS); historyServerWrapper = new JobHistoryServerWrapper(); addService(historyServerWrapper); }