diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9346bbda03f..8d4e41ceb16 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -212,6 +212,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. (masokan via tucu) + MAPREDUCE-4838. Add additional fields like Locality, Avataar to the + JobHistory logs. (Zhijie Shen via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index fa8764a412f..ef09a396056 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -1192,6 +1193,39 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } */ + /** + * Get the workflow adjacencies from the job conf + * The string returned is of the form "key"="value" "key"="value" ... + */ + private static String getWorkflowAdjacencies(Configuration conf) { + int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length(); + Map adjacencies = + conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN); + if (adjacencies.isEmpty()) { + return ""; + } + int size = 0; + for (Entry entry : adjacencies.entrySet()) { + int keyLen = entry.getKey().length(); + size += keyLen - prefixLen; + size += entry.getValue().length() + 6; + } + StringBuilder sb = new StringBuilder(size); + for (Entry entry : adjacencies.entrySet()) { + int keyLen = entry.getKey().length(); + sb.append("\""); + sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen))); + sb.append("\"=\""); + sb.append(escapeString(entry.getValue())); + sb.append("\" "); + } + return sb.toString(); + } + + public static String escapeString(String data) { + return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, + new char[] {'"', '=', '.'}); + } public static class InitTransition implements MultipleArcTransition { @@ -1217,7 +1251,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), - job.jobACLs, job.queueName); + job.jobACLs, job.queueName, + job.conf.get(MRJobConfig.WORKFLOW_ID, ""), + job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), + job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), + getWorkflowAdjacencies(job.conf)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index c9535a79a29..6250f862950 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -66,6 +66,8 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; +import org.apache.hadoop.mapreduce.v2.api.records.Locality; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; @@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl implements private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; private final Resource resourceCapability; - private final String[] dataLocalHosts; + protected Set dataLocalHosts; + protected Set dataLocalRacks; private final List diagnostics = new ArrayList(); private final Lock readLock; private final Lock writeLock; @@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl implements private int shufflePort = -1; private String trackerName; private int httpPort; + private Locality locality; + private Avataar avataar; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -532,8 +537,16 @@ public abstract class TaskAttemptImpl implements getMemoryRequired(conf, taskId.getTaskType())); this.resourceCapability.setVirtualCores( getCpuRequired(conf, taskId.getTaskType())); - this.dataLocalHosts = dataLocalHosts; + + this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); + this.dataLocalRacks = new HashSet(); + for (String host : this.dataLocalHosts) { + this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation()); + } + + locality = Locality.OFF_SWITCH; + avataar = Avataar.VIRGIN; // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -1032,6 +1045,23 @@ public abstract class TaskAttemptImpl implements } } + public Locality getLocality() { + return locality; + } + + public void setLocality(Locality locality) { + this.locality = locality; + } + + public Avataar getAvataar() + { + return avataar; + } + + public void setAvataar(Avataar avataar) { + this.avataar = avataar; + } + private static TaskAttemptState getExternalState( TaskAttemptStateInternal smState) { switch (smState) { @@ -1232,25 +1262,27 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { - Set racks = new HashSet(); - for (String host : taskAttempt.dataLocalHosts) { - racks.add(RackResolver.resolve(host).getNetworkLocation()); - } taskAttempt.eventHandler.handle(new ContainerRequestEvent( - taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt - .resolveHosts(taskAttempt.dataLocalHosts), racks - .toArray(new String[racks.size()]))); + taskAttempt.attemptId, taskAttempt.resourceCapability, + taskAttempt.dataLocalHosts.toArray( + new String[taskAttempt.dataLocalHosts.size()]), + taskAttempt.dataLocalRacks.toArray( + new String[taskAttempt.dataLocalRacks.size()]))); } } } - protected String[] resolveHosts(String[] src) { - String[] result = new String[src.length]; - for (int i = 0; i < src.length; i++) { - if (isIP(src[i])) { - result[i] = resolveHost(src[i]); - } else { - result[i] = src[i]; + protected Set resolveHosts(String[] src) { + Set result = new HashSet(); + if (src != null) { + for (int i = 0; i < src.length; i++) { + if (src[i] == null) { + continue; + } else if (isIP(src[i])) { + result.add(resolveHost(src[i])); + } else { + result.add(src[i]); + } } } return result; @@ -1300,6 +1332,20 @@ public abstract class TaskAttemptImpl implements taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); + + taskAttempt.locality = Locality.OFF_SWITCH; + if (taskAttempt.dataLocalHosts.size() > 0) { + String cHost = taskAttempt.resolveHost( + taskAttempt.containerNodeId.getHost()); + if (taskAttempt.dataLocalHosts.contains(cHost)) { + taskAttempt.locality = Locality.NODE_LOCAL; + } + } + if (taskAttempt.locality == Locality.OFF_SWITCH) { + if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) { + taskAttempt.locality = Locality.RACK_LOCAL; + } + } //launch the container //create the container object to be launched for a given Task attempt @@ -1376,7 +1422,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId.getTaskId().getJobId(), tauce)); } else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } } } @@ -1421,7 +1467,8 @@ public abstract class TaskAttemptImpl implements TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), taskAttempt.launchTime, nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), - taskAttempt.shufflePort, taskAttempt.containerID); + taskAttempt.shufflePort, taskAttempt.containerID, + taskAttempt.locality.toString(), taskAttempt.avataar.toString()); taskAttempt.eventHandler.handle (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); taskAttempt.eventHandler.handle @@ -1510,7 +1557,7 @@ public abstract class TaskAttemptImpl implements // handling failed map/reduce events. }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -1580,7 +1627,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -1648,7 +1695,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId.getTaskId().getJobId(), tauce)); }else { LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + "generated for taskAttempt: " + taskAttempt.getID()); } // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 1771395b90a..28950e96cc5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; @@ -594,8 +595,9 @@ public abstract class TaskImpl implements Task, EventHandler { } // This is always called in the Write Lock - private void addAndScheduleAttempt() { + private void addAndScheduleAttempt(Avataar avataar) { TaskAttempt attempt = createAttempt(); + ((TaskAttemptImpl) attempt).setAvataar(avataar); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } @@ -749,7 +751,7 @@ public abstract class TaskImpl implements Task, EventHandler { @Override public void transition(TaskImpl task, TaskEvent event) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); task.scheduledTime = task.clock.getTime(); TaskStartedEvent tse = new TaskStartedEvent( TypeConverter.fromYarn(task.taskId), task.getLaunchTime(), @@ -772,7 +774,7 @@ public abstract class TaskImpl implements Task, EventHandler { @Override public void transition(TaskImpl task, TaskEvent event) { LOG.info("Scheduling a redundant attempt for task " + task.taskId); - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.SPECULATIVE); } } @@ -849,7 +851,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); } } } @@ -937,7 +939,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.inProgressAttempts.remove(taskAttemptId); if (task.inProgressAttempts.size() == 0 && task.successfulAttempt == null) { - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); } } else { task.handleTaskAttemptCompletion( @@ -1053,7 +1055,7 @@ public abstract class TaskImpl implements Task, EventHandler { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(); + task.addAndScheduleAttempt(Avataar.VIRGIN); return TaskStateInternal.SCHEDULED; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 96db3f13850..010ebc983f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 1ec71f0253b..3594d570864 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -33,6 +33,9 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRConfig; @@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -105,6 +109,13 @@ public class TestJobImpl { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.WORKFLOW_ID, "testId"); + conf.set(MRJobConfig.WORKFLOW_NAME, "testName"); + conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName"); + conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1"); + conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2"); + + AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); @@ -114,6 +125,9 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); + JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId", + "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" "); + dispatcher.register(EventType.class, jseHandler); JobImpl job = createStubbedJob(conf, dispatcher, 0); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -121,6 +135,11 @@ public class TestJobImpl { assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); + try { + Assert.assertTrue(jseHandler.getAssertValue()); + } catch (InterruptedException e) { + Assert.fail("Workflow related attributes are not tested properly"); + } } @Test(timeout=20000) @@ -614,6 +633,67 @@ public class TestJobImpl { Assert.assertEquals(state, job.getInternalState()); } + private static class JobSubmittedEventHandler implements + EventHandler { + + private String workflowId; + + private String workflowName; + + private String workflowNodeName; + + private String workflowAdjacencies; + + private Boolean assertBoolean; + + public JobSubmittedEventHandler(String workflowId, String workflowName, + String workflowNodeName, String workflowAdjacencies) { + this.workflowId = workflowId; + this.workflowName = workflowName; + this.workflowNodeName = workflowNodeName; + this.workflowAdjacencies = workflowAdjacencies; + assertBoolean = null; + } + + @Override + public void handle(JobHistoryEvent jhEvent) { + if (jhEvent.getType() != EventType.JOB_SUBMITTED) { + return; + } + JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent(); + if (!workflowId.equals(jsEvent.getWorkflowId())) { + setAssertValue(false); + return; + } + if (!workflowName.equals(jsEvent.getWorkflowName())) { + setAssertValue(false); + return; + } + if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) { + setAssertValue(false); + return; + } + if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) { + setAssertValue(false); + return; + } + setAssertValue(true); + } + + private synchronized void setAssertValue(Boolean bool) { + assertBoolean = bool; + notify(); + } + + public synchronized boolean getAssertValue() throws InterruptedException { + while (assertBoolean == null) { + wait(); + } + return assertBoolean; + } + + } + private static class StubbedJob extends JobImpl { //override the init transition private final InitTransition initTransition; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 584ef2c7489..25849f459cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Locality; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -157,6 +158,7 @@ public class TestTaskAttempt{ createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptImpl spyTa = spy(mockTaskAttempt); when(spyTa.resolveHost(hosts[0])).thenReturn("host1"); + spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations()); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(spyTa, mockTAEvent); @@ -360,6 +362,8 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); assertFalse(eventHandler.internalError); + assertEquals("Task attempt is not assigned on the local node", + Locality.NODE_LOCAL, taImpl.getLocality()); } @Test @@ -398,7 +402,7 @@ public class TestTaskAttempt{ mock(Token.class), new Credentials(), new SystemClock(), appCtx); - NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0); ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); @@ -416,6 +420,8 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); + assertEquals("Task attempt is not assigned on the local rack", + Locality.RACK_LOCAL, taImpl.getLocality()); } @Test @@ -439,7 +445,7 @@ public class TestTaskAttempt{ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); - when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + when(splits.getLocations()).thenReturn(new String[] {}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); @@ -475,6 +481,8 @@ public class TestTaskAttempt{ TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); + assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH, + taImpl.getLocality()); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index cea1255b937..656e49e6e70 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.v2.api.records.Avataar; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -46,10 +47,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -254,6 +257,7 @@ public class TestTaskImpl { mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); assertTaskScheduledState(); + assertTaskAttemptAvataar(Avataar.VIRGIN); } private void killTask(TaskId taskId) { @@ -338,6 +342,19 @@ public class TestTaskImpl { private void assertTaskSucceededState() { assertEquals(TaskState.SUCCEEDED, mockTask.getState()); } + + /** + * {@link Avataar} + */ + private void assertTaskAttemptAvataar(Avataar avataar) { + for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) { + if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) { + return; + } + } + fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative") + + "task attempt"); + } @Test public void testInit() { @@ -516,6 +533,9 @@ public class TestTaskImpl { // The task should still be in the succeeded state assertTaskSucceededState(); + + // The task should contain speculative a task attempt + assertTaskAttemptAvataar(Avataar.SPECULATIVE); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java new file mode 100644 index 00000000000..e0d043790fd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.api.records; + +public enum Avataar { + VIRGIN, + SPECULATIVE +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java new file mode 100644 index 00000000000..e21693defc1 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.api.records; + +public enum Locality { + NODE_LOCAL, + RACK_LOCAL, + OFF_SWITCH +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index 050433a4887..716f6e2b639 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -91,7 +91,11 @@ "values": "string" } }, - {"name": "jobQueueName", "type": "string"} + {"name": "jobQueueName", "type": "string"}, + {"name": "workflowId", "type": "string"}, + {"name": "workflowName", "type": "string"}, + {"name": "workflowNodeName", "type": "string"}, + {"name": "workflowAdjacencies", "type": "string"} ] }, @@ -191,7 +195,9 @@ {"name": "trackerName", "type": "string"}, {"name": "httpPort", "type": "int"}, {"name": "shufflePort", "type": "int"}, - {"name": "containerId", "type": "string"} + {"name": "containerId", "type": "string"}, + {"name": "locality", "type": "string"}, + {"name": "avataar", "type": "string"} ] }, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 5fc7144a8cb..efb4fb63c61 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -647,5 +647,18 @@ public interface MRJobConfig { "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*", "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*", }; + + public static final String WORKFLOW_ID = "mapreduce.workflow.id"; + + public static final String WORKFLOW_NAME = "mapreduce.workflow.name"; + + public static final String WORKFLOW_NODE_NAME = + "mapreduce.workflow.node.name"; + + public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = + "mapreduce.workflow.adjacency."; + + public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = + "^mapreduce\\.workflow\\.adjacency\\..+"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java index 39af924ce16..83bdbe6f4a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java @@ -52,6 +52,29 @@ public class JobSubmittedEvent implements HistoryEvent { public JobSubmittedEvent(JobID id, String jobName, String userName, long submitTime, String jobConfPath, Map jobACLs, String jobQueueName) { + this(id, jobName, userName, submitTime, jobConfPath, jobACLs, + jobQueueName, "", "", "", ""); + } + + /** + * Create an event to record job submission + * @param id The job Id of the job + * @param jobName Name of the job + * @param userName Name of the user who submitted the job + * @param submitTime Time of submission + * @param jobConfPath Path of the Job Configuration file + * @param jobACLs The configured acls for the job. + * @param jobQueueName The job-queue to which this job was submitted to + * @param workflowId The Id of the workflow + * @param workflowName The name of the workflow + * @param workflowNodeName The node name of the workflow + * @param workflowAdjacencies The adjacencies of the workflow + */ + public JobSubmittedEvent(JobID id, String jobName, String userName, + long submitTime, String jobConfPath, + Map jobACLs, String jobQueueName, + String workflowId, String workflowName, String workflowNodeName, + String workflowAdjacencies) { datum.jobid = new Utf8(id.toString()); datum.jobName = new Utf8(jobName); datum.userName = new Utf8(userName); @@ -66,6 +89,18 @@ public class JobSubmittedEvent implements HistoryEvent { if (jobQueueName != null) { datum.jobQueueName = new Utf8(jobQueueName); } + if (workflowId != null) { + datum.workflowId = new Utf8(workflowId); + } + if (workflowName != null) { + datum.workflowName = new Utf8(workflowName); + } + if (workflowNodeName != null) { + datum.workflowNodeName = new Utf8(workflowNodeName); + } + if (workflowAdjacencies != null) { + datum.workflowAdjacencies = new Utf8(workflowAdjacencies); + } } JobSubmittedEvent() {} @@ -105,6 +140,34 @@ public class JobSubmittedEvent implements HistoryEvent { } return jobAcls; } + /** Get the id of the workflow */ + public String getWorkflowId() { + if (datum.workflowId != null) { + return datum.workflowId.toString(); + } + return null; + } + /** Get the name of the workflow */ + public String getWorkflowName() { + if (datum.workflowName != null) { + return datum.workflowName.toString(); + } + return null; + } + /** Get the node name of the workflow */ + public String getWorkflowNodeName() { + if (datum.workflowNodeName != null) { + return datum.workflowNodeName.toString(); + } + return null; + } + /** Get the adjacencies of the workflow */ + public String getWorkflowAdjacencies() { + if (datum.workflowAdjacencies != null) { + return datum.workflowAdjacencies.toString(); + } + return null; + } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index 95d28b5c056..9b408c0ff60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -46,10 +46,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent { * @param httpPort The port number of the tracker * @param shufflePort The shuffle port number of the container * @param containerId The containerId for the task attempt. + * @param locality The locality of the task attempt + * @param avataar The avataar of the task attempt */ public TaskAttemptStartedEvent( TaskAttemptID attemptId, TaskType taskType, long startTime, String trackerName, - int httpPort, int shufflePort, ContainerId containerId) { + int httpPort, int shufflePort, ContainerId containerId, + String locality, String avataar) { datum.attemptId = new Utf8(attemptId.toString()); datum.taskid = new Utf8(attemptId.getTaskID().toString()); datum.startTime = startTime; @@ -58,14 +61,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent { datum.httpPort = httpPort; datum.shufflePort = shufflePort; datum.containerId = new Utf8(containerId.toString()); + if (locality != null) { + datum.locality = new Utf8(locality); + } + if (avataar != null) { + datum.avataar = new Utf8(avataar); + } } // TODO Remove after MrV1 is removed. // Using a dummy containerId to prevent jobHistory parse failures. public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType, - long startTime, String trackerName, int httpPort, int shufflePort) { + long startTime, String trackerName, int httpPort, int shufflePort, + String locality, String avataar) { this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort, - ConverterUtils.toContainerId("container_-1_-1_-1_-1")); + ConverterUtils.toContainerId("container_-1_-1_-1_-1"), locality, avataar); } TaskAttemptStartedEvent() {} @@ -105,4 +115,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent { public ContainerId getContainerId() { return ConverterUtils.toContainerId(datum.containerId.toString()); } + /** Get the locality */ + public String getLocality() { + if (datum.locality != null) { + return datum.locality.toString(); + } + return null; + } + /** Get the avataar */ + public String getAvataar() { + if (datum.avataar != null) { + return datum.avataar.toString(); + } + return null; + } + } diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java index 86d24af2c09..9cfd85d5ca8 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java @@ -67,6 +67,11 @@ public class Hadoop20JHParser implements JobHistoryParser { reader = new LineReader(input); } + public Hadoop20JHParser(LineReader reader) throws IOException { + super(); + this.reader = reader; + } + Map liveEmitters = new HashMap(); Queue remainingEvents = new LinkedList(); diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java index a86f49e5a8b..73b6957a084 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java @@ -76,6 +76,23 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter { } String jobName = line.get("JOBNAME"); String jobQueueName = line.get("JOB_QUEUE");// could be null + String workflowId = line.get("WORKFLOW_ID"); + if (workflowId == null) { + workflowId = ""; + } + String workflowName = line.get("WORKFLOW_NAME"); + if (workflowName == null) { + workflowName = ""; + } + String workflowNodeName = line.get("WORKFLOW_NODE_NAME"); + if (workflowNodeName == null) { + workflowNodeName = ""; + } + String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES"); + if (workflowAdjacencies == null) { + workflowAdjacencies = ""; + } + if (submitTime != null) { Job20LineHistoryEventEmitter that = @@ -86,7 +103,8 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter { Map jobACLs = new HashMap(); return new JobSubmittedEvent(jobID, jobName, user, - that.originalSubmitTime, jobConf, jobACLs, jobQueueName); + that.originalSubmitTime, jobConf, jobACLs, jobQueueName, + workflowId, workflowName, workflowNodeName, workflowAdjacencies); } return null; diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java index e4da6ccc624..b8cb83b4cc2 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java @@ -65,6 +65,14 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter String taskType = line.get("TASK_TYPE"); String trackerName = line.get("TRACKER_NAME"); String httpPort = line.get("HTTP_PORT"); + String locality = line.get("LOCALITY"); + if (locality == null) { + locality = ""; + } + String avataar = line.get("AVATAAR"); + if (avataar == null) { + avataar = ""; + } if (startTime != null && taskType != null) { TaskAttempt20LineEventEmitter that = @@ -79,7 +87,8 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter .parseInt(httpPort); return new TaskAttemptStartedEvent(taskAttemptID, - that.originalTaskType, that.originalStartTime, trackerName, port, -1); + that.originalTaskType, that.originalStartTime, trackerName, port, -1, + locality, avataar); } return null;