MAPREDUCE-4832. MR AM can get in a split brain situation. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429040 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-01-04 19:15:21 +00:00
parent 251b485af5
commit 78ab699fe9
15 changed files with 484 additions and 20 deletions

View File

@ -659,6 +659,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when
running jobs in local mode (Devaraj K via bobby) running jobs in local mode (Devaraj K via bobby)
MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl extends CompositeService
private AppContext context; private AppContext context;
private Server server; private Server server;
protected TaskHeartbeatHandler taskHeartbeatHandler; protected TaskHeartbeatHandler taskHeartbeatHandler;
private RMHeartbeatHandler rmHeartbeatHandler;
private long commitWindowMs;
private InetSocketAddress address; private InetSocketAddress address;
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task> private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap jvmIDToActiveAttemptMap
@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl extends CompositeService
private JobTokenSecretManager jobTokenSecretManager = null; private JobTokenSecretManager jobTokenSecretManager = null;
public TaskAttemptListenerImpl(AppContext context, public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager) { JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler) {
super(TaskAttemptListenerImpl.class.getName()); super(TaskAttemptListenerImpl.class.getName());
this.context = context; this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager; this.jobTokenSecretManager = jobTokenSecretManager;
this.rmHeartbeatHandler = rmHeartbeatHandler;
} }
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
registerHeartbeatHandler(conf); registerHeartbeatHandler(conf);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
super.init(conf); super.init(conf);
} }
@ -172,6 +179,13 @@ public class TaskAttemptListenerImpl extends CompositeService
taskHeartbeatHandler.progressing(attemptID); taskHeartbeatHandler.progressing(attemptID);
// tell task to retry later if AM has not heard from RM within the commit
// window to help avoid double-committing in a split-brain situation
long now = context.getClock().getTime();
if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
return false;
}
Job job = context.getJob(attemptID.getTaskId().getJobId()); Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId()); Task task = job.getTask(attemptID.getTaskId());
return task.canCommit(attemptID); return task.canCommit(attemptID);

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@ -264,18 +265,20 @@ public class MRAppMaster extends CompositeService {
addIfService(dispatcher); addIfService(dispatcher);
} }
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
containerAllocator = createContainerAllocator(clientService, context);
//service to handle requests to TaskUmbilicalProtocol //service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context); taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener); addIfService(taskAttemptListener);
//service to do the task cleanup //service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer); committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler); addIfService(committerEventHandler);
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
//service to log job history events //service to log job history events
EventHandler<JobHistoryEvent> historyService = EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context); createJobHistoryHandler(context);
@ -303,7 +306,6 @@ public class MRAppMaster extends CompositeService {
speculatorEventDispatcher); speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber) // service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(clientService, context);
addIfService(containerAllocator); addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@ -582,13 +584,15 @@ public class MRAppMaster extends CompositeService {
protected TaskAttemptListener createTaskAttemptListener(AppContext context) { protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
TaskAttemptListener lis = TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager); new TaskAttemptListenerImpl(context, jobTokenSecretManager,
getRMHeartbeatHandler());
return lis; return lis;
} }
protected EventHandler<CommitterEvent> createCommitterEventHandler( protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) { AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer); return new CommitterEventHandler(context, committer,
getRMHeartbeatHandler());
} }
protected ContainerAllocator createContainerAllocator( protected ContainerAllocator createContainerAllocator(
@ -596,6 +600,10 @@ public class MRAppMaster extends CompositeService {
return new ContainerAllocatorRouter(clientService, context); return new ContainerAllocatorRouter(clientService, context);
} }
protected RMHeartbeatHandler getRMHeartbeatHandler() {
return (RMHeartbeatHandler) containerAllocator;
}
protected ContainerLauncher protected ContainerLauncher
createContainerLauncher(final AppContext context) { createContainerLauncher(final AppContext context) {
return new ContainerLauncherRouter(context); return new ContainerLauncherRouter(context);
@ -663,7 +671,7 @@ public class MRAppMaster extends CompositeService {
* happened. * happened.
*/ */
private final class ContainerAllocatorRouter extends AbstractService private final class ContainerAllocatorRouter extends AbstractService
implements ContainerAllocator { implements ContainerAllocator, RMHeartbeatHandler {
private final ClientService clientService; private final ClientService clientService;
private final AppContext context; private final AppContext context;
private ContainerAllocator containerAllocator; private ContainerAllocator containerAllocator;
@ -708,6 +716,16 @@ public class MRAppMaster extends CompositeService {
public void setShouldUnregister(boolean shouldUnregister) { public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister); ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
} }
@Override
public long getLastHeartbeatTime() {
return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
}
} }
/** /**

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -54,6 +55,7 @@ public class CommitterEventHandler extends AbstractService
private final AppContext context; private final AppContext context;
private final OutputCommitter committer; private final OutputCommitter committer;
private final RMHeartbeatHandler rmHeartbeatHandler;
private ThreadPoolExecutor launcherPool; private ThreadPoolExecutor launcherPool;
private Thread eventHandlingThread; private Thread eventHandlingThread;
private BlockingQueue<CommitterEvent> eventQueue = private BlockingQueue<CommitterEvent> eventQueue =
@ -61,11 +63,14 @@ public class CommitterEventHandler extends AbstractService
private final AtomicBoolean stopped; private final AtomicBoolean stopped;
private Thread jobCommitThread = null; private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs; private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
public CommitterEventHandler(AppContext context, OutputCommitter committer) { public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
super("CommitterEventHandler"); super("CommitterEventHandler");
this.context = context; this.context = context;
this.committer = committer; this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
} }
@ -75,6 +80,8 @@ public class CommitterEventHandler extends AbstractService
commitThreadCancelTimeoutMs = conf.getInt( commitThreadCancelTimeoutMs = conf.getInt(
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
} }
@Override @Override
@ -210,6 +217,7 @@ public class CommitterEventHandler extends AbstractService
protected void handleJobCommit(CommitterJobCommitEvent event) { protected void handleJobCommit(CommitterJobCommitEvent event) {
try { try {
jobCommitStarted(); jobCommitStarted();
waitForValidCommitWindow();
committer.commitJob(event.getJobContext()); committer.commitJob(event.getJobContext());
context.getEventHandler().handle( context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID())); new JobCommitCompletedEvent(event.getJobID()));
@ -248,5 +256,26 @@ public class CommitterEventHandler extends AbstractService
new TaskAttemptEvent(event.getAttemptID(), new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE)); TaskAttemptEventType.TA_CLEANUP_DONE));
} }
private synchronized void waitForValidCommitWindow()
throws InterruptedException {
long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
long now = context.getClock().getTime();
while (now - lastHeartbeatTime > commitWindowMs) {
rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() {
@Override
public void run() {
synchronized (EventProcessor.this) {
EventProcessor.this.notify();
}
}
});
wait();
lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
now = context.getClock().getTime();
}
}
} }
} }

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -62,7 +63,8 @@ import org.apache.hadoop.yarn.service.AbstractService;
/** /**
* Registers/unregisters to RM and sends heartbeats to RM. * Registers/unregisters to RM and sends heartbeats to RM.
*/ */
public abstract class RMCommunicator extends AbstractService { public abstract class RMCommunicator extends AbstractService
implements RMHeartbeatHandler {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis private int rmPollInterval;//millis
protected ApplicationId applicationId; protected ApplicationId applicationId;
@ -77,6 +79,8 @@ public abstract class RMCommunicator extends AbstractService {
private Resource minContainerCapability; private Resource minContainerCapability;
private Resource maxContainerCapability; private Resource maxContainerCapability;
protected Map<ApplicationAccessType, String> applicationACLs; protected Map<ApplicationAccessType, String> applicationACLs;
private volatile long lastHeartbeatTime;
private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -95,6 +99,7 @@ public abstract class RMCommunicator extends AbstractService {
this.applicationId = context.getApplicationID(); this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId(); this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
} }
@Override @Override
@ -236,8 +241,12 @@ public abstract class RMCommunicator extends AbstractService {
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e); LOG.error("ERROR IN CONTACTING RM. ", e);
continue;
// TODO: for other exceptions // TODO: for other exceptions
} }
lastHeartbeatTime = context.getClock().getTime();
executeHeartbeatCallbacks();
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (!stopped.get()) { if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning."); LOG.warn("Allocated thread interrupted. Returning.");
@ -295,6 +304,23 @@ public abstract class RMCommunicator extends AbstractService {
protected abstract void heartbeat() throws Exception; protected abstract void heartbeat() throws Exception;
private void executeHeartbeatCallbacks() {
Runnable callback = null;
while ((callback = heartbeatCallbacks.poll()) != null) {
callback.run();
}
}
@Override
public long getLastHeartbeatTime() {
return lastHeartbeatTime;
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
heartbeatCallbacks.add(callback);
}
public void setShouldUnregister(boolean shouldUnregister) { public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister; this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: " LOG.info("RMCommunicator notified that shouldUnregistered is: "

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
public interface RMHeartbeatHandler {
long getLastHeartbeatTime();
void runOnNextHeartbeat(Runnable callback);
}

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test; import org.junit.Test;
@ -51,8 +55,9 @@ public class TestTaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context, public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) { TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager); super(context, jobTokenSecretManager, rmHeartbeatHandler);
this.taskHeartbeatHandler = hbHandler; this.taskHeartbeatHandler = hbHandler;
} }
@ -76,9 +81,12 @@ public class TestTaskAttemptListenerImpl {
public void testGetTask() throws IOException { public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class); AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener = MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler); new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
listener.init(conf); listener.init(conf);
listener.start(); listener.start();
@ -152,9 +160,11 @@ public class TestTaskAttemptListenerImpl {
AppContext appCtx = mock(AppContext.class); AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener = TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret) { new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override @Override
protected void registerHeartbeatHandler(Configuration conf) { protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler; taskHeartbeatHandler = hbHandler;
@ -191,4 +201,46 @@ public class TestTaskAttemptListenerImpl {
return tce; return tce;
} }
@Test
public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
}
};
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
// verify commit not allowed when RM heartbeat has not occurred recently
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
boolean canCommit = listener.canCommit(tid);
assertFalse(canCommit);
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent
when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
canCommit = listener.canCommit(tid);
assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
listener.stop();
}
} }

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -489,7 +490,8 @@ public class MRApp extends MRAppMaster {
return new MRAppContainerAllocator(); return new MRAppContainerAllocator();
} }
protected class MRAppContainerAllocator implements ContainerAllocator { protected class MRAppContainerAllocator
implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount; private int containerCount;
@Override @Override
@ -514,6 +516,16 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(), new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null)); container, null));
} }
@Override
public long getLastHeartbeatTime() {
return getContext().getClock().getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
} }
@Override @Override
@ -566,7 +578,8 @@ public class MRApp extends MRAppMaster {
} }
}; };
return new CommitterEventHandler(context, stubbedCommitter); return new CommitterEventHandler(context, stubbedCommitter,
getRMHeartbeatHandler());
} }
@Override @Override

View File

@ -252,7 +252,7 @@ public class TestFail {
//task time out is reduced //task time out is reduced
//when attempt times out, heartbeat handler will send the lost event //when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure //leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) { return new TaskAttemptListenerImpl(getContext(), null, null) {
@Override @Override
public void startRpcServer(){}; public void startRpcServer(){};
@Override @Override

View File

@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert; import junit.framework.Assert;
@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
return context; return context;
} }
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
AppContext context = createAppContext(appAttemptId, job);
when(context.getClock()).thenReturn(clock);
return context;
}
private static ClientService createMockClientService() { private static ClientService createMockClientService() {
ClientService service = mock(ClientService.class); ClientService service = mock(ClientService.class);
when(service.getBindAddress()).thenReturn( when(service.getBindAddress()).thenReturn(
@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
super.start(); super.start();
} }
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
super(createMockClientService(),
createAppContext(appAttemptId, job, clock));
this.rm = rm;
super.init(conf);
super.start();
}
@Override @Override
protected AMRMProtocol createSchedulerProxy() { protected AMRMProtocol createSchedulerProxy() {
return this.rm.getApplicationMasterService(); return this.rm.getApplicationMasterService();
@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
allocator.recalculatedReduceSchedule); allocator.recalculatedReduceSchedule);
} }
@Test
public void testHeartbeatHandler() throws Exception {
LOG.info("Running testHeartbeatHandler");
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
ControlledClock clock = new ControlledClock(new SystemClock());
AppContext appContext = mock(AppContext.class);
when(appContext.getClock()).thenReturn(clock);
when(appContext.getApplicationID()).thenReturn(
BuilderUtils.newApplicationId(1, 1));
RMContainerAllocator allocator = new RMContainerAllocator(
mock(ClientService.class), appContext) {
@Override
protected void register() {
}
@Override
protected AMRMProtocol createSchedulerProxy() {
return mock(AMRMProtocol.class);
}
@Override
protected synchronized void heartbeat() throws Exception {
}
};
allocator.init(conf);
allocator.start();
clock.setTime(5);
int timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(5, allocator.getLastHeartbeatTime());
clock.setTime(7);
timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(7, allocator.getLastHeartbeatTime());
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
allocator.runOnNextHeartbeat(new Runnable() {
@Override
public void run() {
callbackCalled.set(true);
}
});
clock.setTime(8);
timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(8, allocator.getLastHeartbeatTime());
Assert.assertTrue(callbackCalled.get());
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator(); TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple(); t.testSimple();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -165,6 +166,11 @@ import org.junit.Test;
return allocator; return allocator;
} }
@Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@Override @Override
protected void sysexit() { protected void sysexit() {
} }
@ -177,6 +183,7 @@ import org.junit.Test;
@Override @Override
protected void downloadTokensAndSetupUGI(Configuration conf) { protected void downloadTokensAndSetupUGI(Configuration conf) {
} }
} }
private final class MRAppTestCleanup extends MRApp { private final class MRAppTestCleanup extends MRApp {
@ -237,6 +244,11 @@ import org.junit.Test;
} }
} }
@Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@Override @Override
public void cleanupStagingDir() throws IOException { public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
@ -247,6 +259,20 @@ import org.junit.Test;
} }
} }
private static RMHeartbeatHandler getStubbedHeartbeatHandler(
final AppContext appContext) {
return new RMHeartbeatHandler() {
@Override
public long getLastHeartbeatTime() {
return appContext.getClock().getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
};
}
@Test @Test
public void testStagingCleanupOrder() throws Exception { public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestCommitterEventHandler {
@Test
public void testCommitWindow() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
TestingJobEventHandler jeh = new TestingJobEventHandler();
dispatcher.register(JobEventType.class, jeh);
SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
OutputCommitter committer = mock(OutputCommitter.class);
TestingRMHeartbeatHandler rmhh =
new TestingRMHeartbeatHandler();
CommitterEventHandler ceh = new CommitterEventHandler(appContext,
committer, rmhh);
ceh.init(conf);
ceh.start();
// verify trying to commit when RM heartbeats are stale does not commit
ceh.handle(new CommitterJobCommitEvent(null, null));
long timeToWaitMs = 5000;
while (rmhh.getNumCallbacks() != 1 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not register a heartbeat callback",
1, rmhh.getNumCallbacks());
verify(committer, never()).commitJob(any(JobContext.class));
Assert.assertEquals("committer should not have committed",
0, jeh.numCommitCompletedEvents);
// set a fresh heartbeat and verify commit completes
rmhh.setLastHeartbeatTime(clock.getTime());
timeToWaitMs = 5000;
while (jeh.numCommitCompletedEvents != 1 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not complete commit after RM hearbeat",
1, jeh.numCommitCompletedEvents);
verify(committer, times(1)).commitJob(any(JobContext.class));
// try to commit again and verify it goes through since the heartbeat
// is still fresh
ceh.handle(new CommitterJobCommitEvent(null, null));
timeToWaitMs = 5000;
while (jeh.numCommitCompletedEvents != 2 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not commit",
2, jeh.numCommitCompletedEvents);
verify(committer, times(2)).commitJob(any(JobContext.class));
ceh.stop();
dispatcher.stop();
}
private static class TestingRMHeartbeatHandler
implements RMHeartbeatHandler {
private long lastHeartbeatTime = 0;
private ConcurrentLinkedQueue<Runnable> callbacks =
new ConcurrentLinkedQueue<Runnable>();
@Override
public long getLastHeartbeatTime() {
return lastHeartbeatTime;
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callbacks.add(callback);
}
public void setLastHeartbeatTime(long timestamp) {
lastHeartbeatTime = timestamp;
Runnable callback = null;
while ((callback = callbacks.poll()) != null) {
callback.run();
}
}
public int getNumCallbacks() {
return callbacks.size();
}
}
private static class TestingJobEventHandler
implements EventHandler<JobEvent> {
int numCommitCompletedEvents = 0;
@Override
public void handle(JobEvent event) {
if (event.getType() == JobEventType.JOB_COMMIT_COMPLETED) {
++numCommitCompletedEvents;
}
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -502,13 +503,23 @@ public class TestJobImpl {
private static CommitterEventHandler createCommitterEventHandler( private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) { Dispatcher dispatcher, OutputCommitter committer) {
SystemClock clock = new SystemClock(); final SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn( when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler()); dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock); when(appContext.getClock()).thenReturn(clock);
RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
@Override
public long getLastHeartbeatTime() {
return clock.getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
};
CommitterEventHandler handler = CommitterEventHandler handler =
new CommitterEventHandler(appContext, committer); new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler); dispatcher.register(CommitterEventType.class, handler);
return handler; return handler;
} }

View File

@ -473,6 +473,16 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS = public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
60 * 1000; 60 * 1000;
/**
* Defines a time window in milliseconds for output committer operations.
* If contact with the RM has occurred within this window then commit
* operations are allowed, otherwise the AM will not allow output committer
* operations until contact with the RM has been re-established.
*/
public static final String MR_AM_COMMIT_WINDOW_MS =
MR_AM_PREFIX + "job.committer.commit-window";
public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000;
/** /**
* Boolean. Create the base dirs in the JobHistoryEventHandler * Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that * Set to false for multi-user clusters. This is an internal config that

View File

@ -880,6 +880,15 @@
committer to cancel an operation if the job is killed</description> committer to cancel an operation if the job is killed</description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.job.committer.commit-window</name>
<value>10000</value>
<description>Defines a time window in milliseconds for output commit
operations. If contact with the RM has occurred within this window then
commits are allowed, otherwise the AM will not allow output commits until
contact with the RM has been re-established.</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value> <value>1000</value>