diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index fe26ee5dd3f..34bd15d3986 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap(); + + private ConcurrentMap> attemptIdToStatus + = new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap()); @@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + + AtomicReference lastStatusRef = + attemptIdToStatus.get(yarnAttemptID); + if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); + } + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends CompositeService // // isn't ever changed by the Task itself. // taskStatus.getIncludeCounters(); - context.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, - taskAttemptStatus)); + coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef); + return true; } @@ -468,6 +485,9 @@ public class TaskAttemptListenerImpl extends CompositeService launchedJVMs.add(jvmId); taskHeartbeatHandler.register(attemptID); + + attemptIdToStatus.put(attemptID, + new AtomicReference()); } @Override @@ -489,6 +509,8 @@ public class TaskAttemptListenerImpl extends CompositeService //unregister this attempt taskHeartbeatHandler.unregister(attemptID); + + attemptIdToStatus.remove(attemptID); } @Override @@ -497,4 +519,47 @@ public class TaskAttemptListenerImpl extends CompositeService return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } + + private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID, + TaskAttemptStatus taskAttemptStatus, + AtomicReference lastStatusRef) { + boolean asyncUpdatedNeeded = false; + TaskAttemptStatus lastStatus = lastStatusRef.get(); + + if (lastStatus == null) { + lastStatusRef.set(taskAttemptStatus); + asyncUpdatedNeeded = true; + } else { + List oldFetchFailedMaps = + taskAttemptStatus.fetchFailedMaps; + + // merge fetchFailedMaps from the previous update + if (lastStatus.fetchFailedMaps != null) { + if (taskAttemptStatus.fetchFailedMaps == null) { + taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps; + } else { + taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps); + } + } + + if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) { + // update failed - async dispatcher has processed it in the meantime + taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps; + lastStatusRef.set(taskAttemptStatus); + asyncUpdatedNeeded = true; + } + } + + if (asyncUpdatedNeeded) { + context.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, + lastStatusRef)); + } + } + + @VisibleForTesting + ConcurrentMap> getAttemptIdToStatus() { + return attemptIdToStatus; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java index 715f63d1be5..cef4fd05093 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.event; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; @@ -26,17 +27,16 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { - - private TaskAttemptStatus reportedTaskAttemptStatus; + private AtomicReference taskAttemptStatusRef; public TaskAttemptStatusUpdateEvent(TaskAttemptId id, - TaskAttemptStatus taskAttemptStatus) { + AtomicReference taskAttemptStatusRef) { super(id, TaskAttemptEventType.TA_UPDATE); - this.reportedTaskAttemptStatus = taskAttemptStatus; + this.taskAttemptStatusRef = taskAttemptStatusRef; } - public TaskAttemptStatus getReportedTaskAttemptStatus() { - return reportedTaskAttemptStatus; + public AtomicReference getTaskAttemptStatusRef() { + return taskAttemptStatusRef; } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 270fbe33cfe..dfc3adb29cc 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1680,7 +1681,6 @@ public abstract class TaskAttemptImpl implements taskAttempt.updateProgressSplits(); } - static class RequestContainerTransition implements SingleArcTransition { private final boolean rescheduled; @@ -1865,6 +1865,7 @@ public abstract class TaskAttemptImpl implements // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); + //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); @@ -2310,15 +2311,20 @@ public abstract class TaskAttemptImpl implements } private static class StatusUpdater - implements SingleArcTransition { + implements SingleArcTransition { @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - // Status update calls don't really change the state of the attempt. + TaskAttemptStatusUpdateEvent statusEvent = + ((TaskAttemptStatusUpdateEvent)event); + + AtomicReference taskAttemptStatusRef = + statusEvent.getTaskAttemptStatusRef(); + TaskAttemptStatus newReportedStatus = - ((TaskAttemptStatusUpdateEvent) event) - .getReportedTaskAttemptStatus(); + taskAttemptStatusRef.getAndSet(null); + // Now switch the information in the reportedStatus taskAttempt.reportedStatus = newReportedStatus; taskAttempt.reportedStatus.taskState = taskAttempt.getState(); @@ -2327,12 +2333,10 @@ public abstract class TaskAttemptImpl implements taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); - taskAttempt.updateProgressSplits(); - //if fetch failures are present, send the fetch failure event to job //this only will happen in reduce attempt type - if (taskAttempt.reportedStatus.fetchFailedMaps != null && + if (taskAttempt.reportedStatus.fetchFailedMaps != null && taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { String hostname = taskAttempt.container == null ? "UNKNOWN" : taskAttempt.container.getNodeId().getHost(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 2ebfc9709bb..5c0d7453085 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -31,14 +31,15 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; - -import junit.framework.Assert; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -46,15 +47,83 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.After; +import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +/** + * Tests the behavior of TaskAttemptListenerImpl. + */ +@RunWith(MockitoJUnitRunner.class) public class TestTaskAttemptListenerImpl { - public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { + private static final String ATTEMPT1_ID = + "attempt_123456789012_0001_m_000001_0"; + private static final String ATTEMPT2_ID = + "attempt_123456789012_0001_m_000002_0"; + + private static final TaskAttemptId TASKATTEMPTID1 = + TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID)); + private static final TaskAttemptId TASKATTEMPTID2 = + TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID)); + + @Mock + private AppContext appCtx; + + @Mock + private JobTokenSecretManager secret; + + @Mock + private RMHeartbeatHandler rmHeartbeatHandler; + + @Mock + private TaskHeartbeatHandler hbHandler; + + @Mock + private Dispatcher dispatcher; + + @Mock + private Task task; + + @SuppressWarnings("rawtypes") + @Mock + private EventHandler ea; + + @SuppressWarnings("rawtypes") + @Captor + private ArgumentCaptor eventCaptor; + + private JVMId id; + private WrappedJvmID wid; + private TaskAttemptID attemptID; + private TaskAttemptId attemptId; + private ReduceTaskStatus firstReduceStatus; + private ReduceTaskStatus secondReduceStatus; + private ReduceTaskStatus thirdReduceStatus; + + private MockTaskAttemptListenerImpl listener; + + /** + * Extension of the original TaskAttemptImpl + * for testing purposes + */ + public static class MockTaskAttemptListenerImpl + extends TaskAttemptListenerImpl { public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, @@ -85,26 +154,24 @@ public class TestTaskAttemptListenerImpl { //Empty } } - + + @After + public void after() throws IOException { + if (listener != null) { + listener.close(); + listener = null; + } + } + @Test (timeout=5000) public void testGetTask() throws IOException { - AppContext appCtx = mock(AppContext.class); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, - rmHeartbeatHandler, hbHandler); - Configuration conf = new Configuration(); - listener.init(conf); - listener.start(); - JVMId id = new JVMId("foo",1, true, 1); - WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + configureMocks(); + startListener(false); // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); + context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); @@ -112,20 +179,18 @@ public class TestTaskAttemptListenerImpl { // Verify ask after registration but before launch. // Don't kill, should be null. - TaskAttemptId attemptID = mock(TaskAttemptId.class); - Task task = mock(Task.class); //Now put a task with the ID listener.registerPendingTask(task, wid); result = listener.getTask(context); assertNull(result); // Unregister for more testing. - listener.unregister(attemptID, wid); + listener.unregister(attemptId, wid); // Verify ask after registration and launch //Now put a task with the ID listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptID, wid); - verify(hbHandler).register(attemptID); + listener.registerLaunchedTask(attemptId, wid); + verify(hbHandler).register(attemptId); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); @@ -136,15 +201,13 @@ public class TestTaskAttemptListenerImpl { assertNotNull(result); assertTrue(result.shouldDie); - listener.unregister(attemptID, wid); + listener.unregister(attemptId, wid); // Verify after unregistration. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - listener.stop(); - // test JVMID JVMId jvmid = JVMId.forName("jvm_001_002_m_004"); assertNotNull(jvmid); @@ -190,14 +253,11 @@ public class TestTaskAttemptListenerImpl { when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn( TypeConverter.fromYarn(empty)); - AppContext appCtx = mock(AppContext.class); + configureMocks(); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - TaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + + listener = new MockTaskAttemptListenerImpl( + appCtx, secret, rmHeartbeatHandler, hbHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -238,20 +298,18 @@ public class TestTaskAttemptListenerImpl { public void testCommitWindow() throws IOException { SystemClock clock = SystemClock.getInstance(); + configureMocks(); + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); Job mockJob = mock(Job.class); when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); - AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getClock()).thenReturn(clock); - JobTokenSecretManager secret = mock(JobTokenSecretManager.class); - RMHeartbeatHandler rmHeartbeatHandler = - mock(RMHeartbeatHandler.class); - final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - TaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + + listener = new MockTaskAttemptListenerImpl( + appCtx, secret, rmHeartbeatHandler, hbHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -269,11 +327,119 @@ public class TestTaskAttemptListenerImpl { verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); // verify commit allowed when RM heartbeat is recent - when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); + when(rmHeartbeatHandler.getLastHeartbeatTime()) + .thenReturn(clock.getTime()); canCommit = listener.canCommit(tid); assertTrue(canCommit); verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); + } - listener.stop(); + @Test + public void testSingleStatusUpdate() + throws IOException, InterruptedException { + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + + verify(ea).handle(eventCaptor.capture()); + TaskAttemptStatusUpdateEvent updateEvent = + (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); + + TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); + assertEquals(1, status.fetchFailedMaps.size()); + assertEquals(Phase.SHUFFLE, status.phase); + } + + @Test + public void testStatusUpdateEventCoalescing() + throws IOException, InterruptedException { + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + listener.statusUpdate(attemptID, secondReduceStatus); + + verify(ea).handle(any(Event.class)); + ConcurrentMap> attemptIdToStatus = + listener.getAttemptIdToStatus(); + TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get(); + + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); + assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2)); + assertEquals(2, status.fetchFailedMaps.size()); + assertEquals(Phase.SORT, status.phase); + } + + @Test + public void testCoalescedStatusUpdatesCleared() + throws IOException, InterruptedException { + // First two events are coalesced, the third is not + configureMocks(); + startListener(true); + + listener.statusUpdate(attemptID, firstReduceStatus); + listener.statusUpdate(attemptID, secondReduceStatus); + ConcurrentMap> attemptIdToStatus = + listener.getAttemptIdToStatus(); + attemptIdToStatus.get(attemptId).set(null); + listener.statusUpdate(attemptID, thirdReduceStatus); + + verify(ea, times(2)).handle(eventCaptor.capture()); + TaskAttemptStatusUpdateEvent updateEvent = + (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); + + TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); + assertNull(status.fetchFailedMaps); + assertEquals(Phase.REDUCE, status.phase); + } + + @Test(expected = IllegalStateException.class) + public void testStatusUpdateFromUnregisteredTask() + throws IOException, InterruptedException{ + configureMocks(); + startListener(false); + + listener.statusUpdate(attemptID, firstReduceStatus); + } + + private void configureMocks() { + firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE, + new Counters()); + firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID)); + + secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT, + new Counters()); + secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID)); + + thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", + TaskStatus.Phase.REDUCE, new Counters()); + + when(dispatcher.getEventHandler()).thenReturn(ea); + when(appCtx.getEventHandler()).thenReturn(ea); + listener = new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler); + id = new JVMId("foo", 1, true, 1); + wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); + attemptId = TypeConverter.toYarn(attemptID); + } + + private void startListener(boolean registerTask) { + Configuration conf = new Configuration(); + + listener.init(conf); + listener.start(); + + if (registerTask) { + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptId, wid); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index 8d25079b97d..98773f60115 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskCompletionEvent; @@ -429,7 +430,7 @@ public class TestFetchFailure { status.stateString = "OK"; status.taskState = attempt.getState(); TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), - status); + new AtomicReference<>(status)); app.getContext().getEventHandler().handle(event); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index 77f9a092a1b..ca3c28cbaf5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -103,7 +104,8 @@ public class TestMRClientService { taskAttemptStatus.phase = Phase.MAP; // send the status update app.getContext().getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus)); + new TaskAttemptStatusUpdateEvent(attempt.getID(), + new AtomicReference<>(taskAttemptStatus))); //verify that all object are fully populated by invoking RPCs. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index e8003c0be73..de171c75125 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -84,7 +85,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -155,7 +157,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -180,7 +183,8 @@ public class TestSpeculativeExecutionWithMRApp { TaskAttemptState.RUNNING); speculatedTask = task.getValue(); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } } @@ -195,7 +199,8 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), + new AtomicReference<>(status)); appEventHandler.handle(event); } }