MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

This commit is contained in:
Jason Lowe 2017-12-01 14:15:25 -06:00
parent 3c57defaa4
commit d0fc1cd0c8
7 changed files with 307 additions and 64 deletions

View File

@ -22,9 +22,11 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This class is responsible for talking to the task umblical. * This class is responsible for talking to the task umblical.
* It also converts all the old data structures * It also converts all the old data structures
@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task> private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap jvmIDToActiveAttemptMap
= new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>(); = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
private ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
= new ConcurrentHashMap<>();
private Set<WrappedJvmID> launchedJVMs = Collections private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends CompositeService
TaskStatus taskStatus) throws IOException, InterruptedException { TaskStatus taskStatus) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
AtomicReference<TaskAttemptStatus> lastStatusRef =
attemptIdToStatus.get(yarnAttemptID);
if (lastStatusRef == null) {
throw new IllegalStateException("Status update was called"
+ " with illegal TaskAttemptId: " + yarnAttemptID);
}
taskHeartbeatHandler.progressing(yarnAttemptID); taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus = TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus(); new TaskAttemptStatus();
@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends CompositeService
// // isn't ever changed by the Task itself. // // isn't ever changed by the Task itself.
// taskStatus.getIncludeCounters(); // taskStatus.getIncludeCounters();
context.getEventHandler().handle( coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef);
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
taskAttemptStatus));
return true; return true;
} }
@ -468,6 +485,9 @@ public class TaskAttemptListenerImpl extends CompositeService
launchedJVMs.add(jvmId); launchedJVMs.add(jvmId);
taskHeartbeatHandler.register(attemptID); taskHeartbeatHandler.register(attemptID);
attemptIdToStatus.put(attemptID,
new AtomicReference<TaskAttemptStatus>());
} }
@Override @Override
@ -489,6 +509,8 @@ public class TaskAttemptListenerImpl extends CompositeService
//unregister this attempt //unregister this attempt
taskHeartbeatHandler.unregister(attemptID); taskHeartbeatHandler.unregister(attemptID);
attemptIdToStatus.remove(attemptID);
} }
@Override @Override
@ -497,4 +519,47 @@ public class TaskAttemptListenerImpl extends CompositeService
return ProtocolSignature.getProtocolSignature(this, return ProtocolSignature.getProtocolSignature(this,
protocol, clientVersion, clientMethodsHash); protocol, clientVersion, clientMethodsHash);
} }
private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
TaskAttemptStatus taskAttemptStatus,
AtomicReference<TaskAttemptStatus> lastStatusRef) {
boolean asyncUpdatedNeeded = false;
TaskAttemptStatus lastStatus = lastStatusRef.get();
if (lastStatus == null) {
lastStatusRef.set(taskAttemptStatus);
asyncUpdatedNeeded = true;
} else {
List<TaskAttemptId> oldFetchFailedMaps =
taskAttemptStatus.fetchFailedMaps;
// merge fetchFailedMaps from the previous update
if (lastStatus.fetchFailedMaps != null) {
if (taskAttemptStatus.fetchFailedMaps == null) {
taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
} else {
taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
}
}
if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
// update failed - async dispatcher has processed it in the meantime
taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
lastStatusRef.set(taskAttemptStatus);
asyncUpdatedNeeded = true;
}
}
if (asyncUpdatedNeeded) {
context.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
lastStatusRef));
}
}
@VisibleForTesting
ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
return attemptIdToStatus;
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event; package org.apache.hadoop.mapreduce.v2.app.job.event;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@ -26,17 +27,16 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private AtomicReference<TaskAttemptStatus> taskAttemptStatusRef;
private TaskAttemptStatus reportedTaskAttemptStatus;
public TaskAttemptStatusUpdateEvent(TaskAttemptId id, public TaskAttemptStatusUpdateEvent(TaskAttemptId id,
TaskAttemptStatus taskAttemptStatus) { AtomicReference<TaskAttemptStatus> taskAttemptStatusRef) {
super(id, TaskAttemptEventType.TA_UPDATE); super(id, TaskAttemptEventType.TA_UPDATE);
this.reportedTaskAttemptStatus = taskAttemptStatus; this.taskAttemptStatusRef = taskAttemptStatusRef;
} }
public TaskAttemptStatus getReportedTaskAttemptStatus() { public AtomicReference<TaskAttemptStatus> getTaskAttemptStatusRef() {
return reportedTaskAttemptStatus; return taskAttemptStatusRef;
} }
/** /**

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -1680,7 +1681,6 @@ public abstract class TaskAttemptImpl implements
taskAttempt.updateProgressSplits(); taskAttempt.updateProgressSplits();
} }
static class RequestContainerTransition implements static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled; private final boolean rescheduled;
@ -1865,6 +1865,7 @@ public abstract class TaskAttemptImpl implements
// register it to TaskAttemptListener so that it can start monitoring it. // register it to TaskAttemptListener so that it can start monitoring it.
taskAttempt.taskAttemptListener taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address. //TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
@ -2310,15 +2311,20 @@ public abstract class TaskAttemptImpl implements
} }
private static class StatusUpdater private static class StatusUpdater
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
// Status update calls don't really change the state of the attempt. TaskAttemptStatusUpdateEvent statusEvent =
((TaskAttemptStatusUpdateEvent)event);
AtomicReference<TaskAttemptStatus> taskAttemptStatusRef =
statusEvent.getTaskAttemptStatusRef();
TaskAttemptStatus newReportedStatus = TaskAttemptStatus newReportedStatus =
((TaskAttemptStatusUpdateEvent) event) taskAttemptStatusRef.getAndSet(null);
.getReportedTaskAttemptStatus();
// Now switch the information in the reportedStatus // Now switch the information in the reportedStatus
taskAttempt.reportedStatus = newReportedStatus; taskAttempt.reportedStatus = newReportedStatus;
taskAttempt.reportedStatus.taskState = taskAttempt.getState(); taskAttempt.reportedStatus.taskState = taskAttempt.getState();
@ -2327,12 +2333,10 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler.handle taskAttempt.eventHandler.handle
(new SpeculatorEvent (new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime())); (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
taskAttempt.updateProgressSplits(); taskAttempt.updateProgressSplits();
//if fetch failures are present, send the fetch failure event to job //if fetch failures are present, send the fetch failure event to job
//this only will happen in reduce attempt type //this only will happen in reduce attempt type
if (taskAttempt.reportedStatus.fetchFailedMaps != null && if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
String hostname = taskAttempt.container == null ? "UNKNOWN" String hostname = taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.container.getNodeId().getHost(); : taskAttempt.container.getNodeId().getHost();

View File

@ -31,14 +31,15 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import junit.framework.Assert; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -46,15 +47,83 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/**
* Tests the behavior of TaskAttemptListenerImpl.
*/
@RunWith(MockitoJUnitRunner.class)
public class TestTaskAttemptListenerImpl { public class TestTaskAttemptListenerImpl {
public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { private static final String ATTEMPT1_ID =
"attempt_123456789012_0001_m_000001_0";
private static final String ATTEMPT2_ID =
"attempt_123456789012_0001_m_000002_0";
private static final TaskAttemptId TASKATTEMPTID1 =
TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
private static final TaskAttemptId TASKATTEMPTID2 =
TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
@Mock
private AppContext appCtx;
@Mock
private JobTokenSecretManager secret;
@Mock
private RMHeartbeatHandler rmHeartbeatHandler;
@Mock
private TaskHeartbeatHandler hbHandler;
@Mock
private Dispatcher dispatcher;
@Mock
private Task task;
@SuppressWarnings("rawtypes")
@Mock
private EventHandler<Event> ea;
@SuppressWarnings("rawtypes")
@Captor
private ArgumentCaptor<Event> eventCaptor;
private JVMId id;
private WrappedJvmID wid;
private TaskAttemptID attemptID;
private TaskAttemptId attemptId;
private ReduceTaskStatus firstReduceStatus;
private ReduceTaskStatus secondReduceStatus;
private ReduceTaskStatus thirdReduceStatus;
private MockTaskAttemptListenerImpl listener;
/**
* Extension of the original TaskAttemptImpl
* for testing purposes
*/
public static class MockTaskAttemptListenerImpl
extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context, public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
@ -85,26 +154,24 @@ public class TestTaskAttemptListenerImpl {
//Empty //Empty
} }
} }
@After
public void after() throws IOException {
if (listener != null) {
listener.close();
listener = null;
}
}
@Test (timeout=5000) @Test (timeout=5000)
public void testGetTask() throws IOException { public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class); configureMocks();
JobTokenSecretManager secret = mock(JobTokenSecretManager.class); startListener(false);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
JVMId id = new JVMId("foo",1, true, 1);
WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
// Verify ask before registration. // Verify ask before registration.
//The JVM ID has not been registered yet so we should kill it. //The JVM ID has not been registered yet so we should kill it.
JvmContext context = new JvmContext(); JvmContext context = new JvmContext();
context.jvmId = id; context.jvmId = id;
JvmTask result = listener.getTask(context); JvmTask result = listener.getTask(context);
assertNotNull(result); assertNotNull(result);
@ -112,20 +179,18 @@ public class TestTaskAttemptListenerImpl {
// Verify ask after registration but before launch. // Verify ask after registration but before launch.
// Don't kill, should be null. // Don't kill, should be null.
TaskAttemptId attemptID = mock(TaskAttemptId.class);
Task task = mock(Task.class);
//Now put a task with the ID //Now put a task with the ID
listener.registerPendingTask(task, wid); listener.registerPendingTask(task, wid);
result = listener.getTask(context); result = listener.getTask(context);
assertNull(result); assertNull(result);
// Unregister for more testing. // Unregister for more testing.
listener.unregister(attemptID, wid); listener.unregister(attemptId, wid);
// Verify ask after registration and launch // Verify ask after registration and launch
//Now put a task with the ID //Now put a task with the ID
listener.registerPendingTask(task, wid); listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptID, wid); listener.registerLaunchedTask(attemptId, wid);
verify(hbHandler).register(attemptID); verify(hbHandler).register(attemptId);
result = listener.getTask(context); result = listener.getTask(context);
assertNotNull(result); assertNotNull(result);
assertFalse(result.shouldDie); assertFalse(result.shouldDie);
@ -136,15 +201,13 @@ public class TestTaskAttemptListenerImpl {
assertNotNull(result); assertNotNull(result);
assertTrue(result.shouldDie); assertTrue(result.shouldDie);
listener.unregister(attemptID, wid); listener.unregister(attemptId, wid);
// Verify after unregistration. // Verify after unregistration.
result = listener.getTask(context); result = listener.getTask(context);
assertNotNull(result); assertNotNull(result);
assertTrue(result.shouldDie); assertTrue(result.shouldDie);
listener.stop();
// test JVMID // test JVMID
JVMId jvmid = JVMId.forName("jvm_001_002_m_004"); JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
assertNotNull(jvmid); assertNotNull(jvmid);
@ -190,14 +253,11 @@ public class TestTaskAttemptListenerImpl {
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn( when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
TypeConverter.fromYarn(empty)); TypeConverter.fromYarn(empty));
AppContext appCtx = mock(AppContext.class); configureMocks();
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler = listener = new MockTaskAttemptListenerImpl(
mock(RMHeartbeatHandler.class); appCtx, secret, rmHeartbeatHandler, hbHandler) {
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override @Override
protected void registerHeartbeatHandler(Configuration conf) { protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler; taskHeartbeatHandler = hbHandler;
@ -238,20 +298,18 @@ public class TestTaskAttemptListenerImpl {
public void testCommitWindow() throws IOException { public void testCommitWindow() throws IOException {
SystemClock clock = SystemClock.getInstance(); SystemClock clock = SystemClock.getInstance();
configureMocks();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock); when(appCtx.getClock()).thenReturn(clock);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler = listener = new MockTaskAttemptListenerImpl(
mock(RMHeartbeatHandler.class); appCtx, secret, rmHeartbeatHandler, hbHandler) {
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override @Override
protected void registerHeartbeatHandler(Configuration conf) { protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler; taskHeartbeatHandler = hbHandler;
@ -269,11 +327,119 @@ public class TestTaskAttemptListenerImpl {
verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent // verify commit allowed when RM heartbeat is recent
when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); when(rmHeartbeatHandler.getLastHeartbeatTime())
.thenReturn(clock.getTime());
canCommit = listener.canCommit(tid); canCommit = listener.canCommit(tid);
assertTrue(canCommit); assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
}
listener.stop(); @Test
public void testSingleStatusUpdate()
throws IOException, InterruptedException {
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
verify(ea).handle(eventCaptor.capture());
TaskAttemptStatusUpdateEvent updateEvent =
(TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
assertEquals(1, status.fetchFailedMaps.size());
assertEquals(Phase.SHUFFLE, status.phase);
}
@Test
public void testStatusUpdateEventCoalescing()
throws IOException, InterruptedException {
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
listener.statusUpdate(attemptID, secondReduceStatus);
verify(ea).handle(any(Event.class));
ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
listener.getAttemptIdToStatus();
TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
assertEquals(2, status.fetchFailedMaps.size());
assertEquals(Phase.SORT, status.phase);
}
@Test
public void testCoalescedStatusUpdatesCleared()
throws IOException, InterruptedException {
// First two events are coalesced, the third is not
configureMocks();
startListener(true);
listener.statusUpdate(attemptID, firstReduceStatus);
listener.statusUpdate(attemptID, secondReduceStatus);
ConcurrentMap<TaskAttemptId,
AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
listener.getAttemptIdToStatus();
attemptIdToStatus.get(attemptId).set(null);
listener.statusUpdate(attemptID, thirdReduceStatus);
verify(ea, times(2)).handle(eventCaptor.capture());
TaskAttemptStatusUpdateEvent updateEvent =
(TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
assertNull(status.fetchFailedMaps);
assertEquals(Phase.REDUCE, status.phase);
}
@Test(expected = IllegalStateException.class)
public void testStatusUpdateFromUnregisteredTask()
throws IOException, InterruptedException{
configureMocks();
startListener(false);
listener.statusUpdate(attemptID, firstReduceStatus);
}
private void configureMocks() {
firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
new Counters());
firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
new Counters());
secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
TaskStatus.State.RUNNING, "", "RUNNING", "",
TaskStatus.Phase.REDUCE, new Counters());
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
listener = new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
id = new JVMId("foo", 1, true, 1);
wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
attemptId = TypeConverter.toYarn(attemptID);
}
private void startListener(boolean registerTask) {
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
if (registerTask) {
listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptId, wid);
}
} }
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskCompletionEvent;
@ -429,7 +430,7 @@ public class TestFetchFailure {
status.stateString = "OK"; status.stateString = "OK";
status.taskState = attempt.getState(); status.taskState = attempt.getState();
TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
status); new AtomicReference<>(status));
app.getContext().getEventHandler().handle(event); app.getContext().getEventHandler().handle(event);
} }

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert; import org.junit.Assert;
@ -103,7 +104,8 @@ public class TestMRClientService {
taskAttemptStatus.phase = Phase.MAP; taskAttemptStatus.phase = Phase.MAP;
// send the status update // send the status update
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus)); new TaskAttemptStatusUpdateEvent(attempt.getID(),
new AtomicReference<>(taskAttemptStatus)));
//verify that all object are fully populated by invoking RPCs. //verify that all object are fully populated by invoking RPCs.

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -84,7 +85,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
TaskAttemptState.RUNNING); TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event = TaskAttemptStatusUpdateEvent event =
new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
new AtomicReference<>(status));
appEventHandler.handle(event); appEventHandler.handle(event);
} }
} }
@ -155,7 +157,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
TaskAttemptState.RUNNING); TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event = TaskAttemptStatusUpdateEvent event =
new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
new AtomicReference<>(status));
appEventHandler.handle(event); appEventHandler.handle(event);
} }
} }
@ -180,7 +183,8 @@ public class TestSpeculativeExecutionWithMRApp {
TaskAttemptState.RUNNING); TaskAttemptState.RUNNING);
speculatedTask = task.getValue(); speculatedTask = task.getValue();
TaskAttemptStatusUpdateEvent event = TaskAttemptStatusUpdateEvent event =
new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
new AtomicReference<>(status));
appEventHandler.handle(event); appEventHandler.handle(event);
} }
} }
@ -195,7 +199,8 @@ public class TestSpeculativeExecutionWithMRApp {
createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
TaskAttemptState.RUNNING); TaskAttemptState.RUNNING);
TaskAttemptStatusUpdateEvent event = TaskAttemptStatusUpdateEvent event =
new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(),
new AtomicReference<>(status));
appEventHandler.handle(event); appEventHandler.handle(event);
} }
} }