svn merge -c 1429040 FIXES: MAPREDUCE-4832. MR AM can get in a split brain situation. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1429046 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-01-04 19:27:56 +00:00
parent b1fe072e04
commit ec4fc07c52
15 changed files with 484 additions and 20 deletions

View File

@ -499,6 +499,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when
running jobs in local mode (Devaraj K via bobby)
MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl extends CompositeService
private AppContext context;
private Server server;
protected TaskHeartbeatHandler taskHeartbeatHandler;
private RMHeartbeatHandler rmHeartbeatHandler;
private long commitWindowMs;
private InetSocketAddress address;
private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
jvmIDToActiveAttemptMap
@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl extends CompositeService
private JobTokenSecretManager jobTokenSecretManager = null;
public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager) {
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler) {
super(TaskAttemptListenerImpl.class.getName());
this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager;
this.rmHeartbeatHandler = rmHeartbeatHandler;
}
@Override
public void init(Configuration conf) {
registerHeartbeatHandler(conf);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
super.init(conf);
}
@ -172,6 +179,13 @@ public class TaskAttemptListenerImpl extends CompositeService
taskHeartbeatHandler.progressing(attemptID);
// tell task to retry later if AM has not heard from RM within the commit
// window to help avoid double-committing in a split-brain situation
long now = context.getClock().getTime();
if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
return false;
}
Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId());
return task.canCommit(attemptID);

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@ -264,18 +265,20 @@ public class MRAppMaster extends CompositeService {
addIfService(dispatcher);
}
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
containerAllocator = createContainerAllocator(clientService, context);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to do the task cleanup
//service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
@ -303,7 +306,6 @@ public class MRAppMaster extends CompositeService {
speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(clientService, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@ -582,13 +584,15 @@ public class MRAppMaster extends CompositeService {
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager);
new TaskAttemptListenerImpl(context, jobTokenSecretManager,
getRMHeartbeatHandler());
return lis;
}
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer);
return new CommitterEventHandler(context, committer,
getRMHeartbeatHandler());
}
protected ContainerAllocator createContainerAllocator(
@ -596,6 +600,10 @@ public class MRAppMaster extends CompositeService {
return new ContainerAllocatorRouter(clientService, context);
}
protected RMHeartbeatHandler getRMHeartbeatHandler() {
return (RMHeartbeatHandler) containerAllocator;
}
protected ContainerLauncher
createContainerLauncher(final AppContext context) {
return new ContainerLauncherRouter(context);
@ -663,7 +671,7 @@ public class MRAppMaster extends CompositeService {
* happened.
*/
private final class ContainerAllocatorRouter extends AbstractService
implements ContainerAllocator {
implements ContainerAllocator, RMHeartbeatHandler {
private final ClientService clientService;
private final AppContext context;
private ContainerAllocator containerAllocator;
@ -708,6 +716,16 @@ public class MRAppMaster extends CompositeService {
public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
}
@Override
public long getLastHeartbeatTime() {
return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
}
}
/**

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@ -54,6 +55,7 @@ public class CommitterEventHandler extends AbstractService
private final AppContext context;
private final OutputCommitter committer;
private final RMHeartbeatHandler rmHeartbeatHandler;
private ThreadPoolExecutor launcherPool;
private Thread eventHandlingThread;
private BlockingQueue<CommitterEvent> eventQueue =
@ -61,11 +63,14 @@ public class CommitterEventHandler extends AbstractService
private final AtomicBoolean stopped;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
public CommitterEventHandler(AppContext context, OutputCommitter committer) {
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
}
@ -75,6 +80,8 @@ public class CommitterEventHandler extends AbstractService
commitThreadCancelTimeoutMs = conf.getInt(
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
}
@Override
@ -210,6 +217,7 @@ public class CommitterEventHandler extends AbstractService
protected void handleJobCommit(CommitterJobCommitEvent event) {
try {
jobCommitStarted();
waitForValidCommitWindow();
committer.commitJob(event.getJobContext());
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
@ -248,5 +256,26 @@ public class CommitterEventHandler extends AbstractService
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
}
private synchronized void waitForValidCommitWindow()
throws InterruptedException {
long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
long now = context.getClock().getTime();
while (now - lastHeartbeatTime > commitWindowMs) {
rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() {
@Override
public void run() {
synchronized (EventProcessor.this) {
EventProcessor.this.notify();
}
}
});
wait();
lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime();
now = context.getClock().getTime();
}
}
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -62,7 +63,8 @@ import org.apache.hadoop.yarn.service.AbstractService;
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
public abstract class RMCommunicator extends AbstractService {
public abstract class RMCommunicator extends AbstractService
implements RMHeartbeatHandler {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis
protected ApplicationId applicationId;
@ -77,6 +79,8 @@ public abstract class RMCommunicator extends AbstractService {
private Resource minContainerCapability;
private Resource maxContainerCapability;
protected Map<ApplicationAccessType, String> applicationACLs;
private volatile long lastHeartbeatTime;
private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -95,6 +99,7 @@ public abstract class RMCommunicator extends AbstractService {
this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
this.stopped = new AtomicBoolean(false);
this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
}
@Override
@ -236,8 +241,12 @@ public abstract class RMCommunicator extends AbstractService {
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
continue;
// TODO: for other exceptions
}
lastHeartbeatTime = context.getClock().getTime();
executeHeartbeatCallbacks();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
@ -295,6 +304,23 @@ public abstract class RMCommunicator extends AbstractService {
protected abstract void heartbeat() throws Exception;
private void executeHeartbeatCallbacks() {
Runnable callback = null;
while ((callback = heartbeatCallbacks.poll()) != null) {
callback.run();
}
}
@Override
public long getLastHeartbeatTime() {
return lastHeartbeatTime;
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
heartbeatCallbacks.add(callback);
}
public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: "

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
public interface RMHeartbeatHandler {
long getLastHeartbeatTime();
void runOnNextHeartbeat(Runnable callback);
}

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
@ -51,8 +55,9 @@ public class TestTaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager);
super(context, jobTokenSecretManager, rmHeartbeatHandler);
this.taskHeartbeatHandler = hbHandler;
}
@ -76,9 +81,12 @@ public class TestTaskAttemptListenerImpl {
public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@ -152,9 +160,11 @@ public class TestTaskAttemptListenerImpl {
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret) {
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@ -191,4 +201,46 @@ public class TestTaskAttemptListenerImpl {
return tce;
}
@Test
public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
}
};
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
// verify commit not allowed when RM heartbeat has not occurred recently
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
boolean canCommit = listener.canCommit(tid);
assertFalse(canCommit);
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
// verify commit allowed when RM heartbeat is recent
when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
canCommit = listener.canCommit(tid);
assertTrue(canCommit);
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
listener.stop();
}
}

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@ -489,7 +490,8 @@ public class MRApp extends MRAppMaster {
return new MRAppContainerAllocator();
}
protected class MRAppContainerAllocator implements ContainerAllocator {
protected class MRAppContainerAllocator
implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount;
@Override
@ -514,6 +516,16 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
@Override
public long getLastHeartbeatTime() {
return getContext().getClock().getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
}
@Override
@ -566,7 +578,8 @@ public class MRApp extends MRAppMaster {
}
};
return new CommitterEventHandler(context, stubbedCommitter);
return new CommitterEventHandler(context, stubbedCommitter,
getRMHeartbeatHandler());
}
@Override

View File

@ -252,7 +252,7 @@ public class TestFail {
//task time out is reduced
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) {
return new TaskAttemptListenerImpl(getContext(), null, null) {
@Override
public void startRpcServer(){};
@Override

View File

@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
return context;
}
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
AppContext context = createAppContext(appAttemptId, job);
when(context.getClock()).thenReturn(clock);
return context;
}
private static ClientService createMockClientService() {
ClientService service = mock(ClientService.class);
when(service.getBindAddress()).thenReturn(
@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
super.start();
}
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
super(createMockClientService(),
createAppContext(appAttemptId, job, clock));
this.rm = rm;
super.init(conf);
super.start();
}
@Override
protected AMRMProtocol createSchedulerProxy() {
return this.rm.getApplicationMasterService();
@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
allocator.recalculatedReduceSchedule);
}
@Test
public void testHeartbeatHandler() throws Exception {
LOG.info("Running testHeartbeatHandler");
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
ControlledClock clock = new ControlledClock(new SystemClock());
AppContext appContext = mock(AppContext.class);
when(appContext.getClock()).thenReturn(clock);
when(appContext.getApplicationID()).thenReturn(
BuilderUtils.newApplicationId(1, 1));
RMContainerAllocator allocator = new RMContainerAllocator(
mock(ClientService.class), appContext) {
@Override
protected void register() {
}
@Override
protected AMRMProtocol createSchedulerProxy() {
return mock(AMRMProtocol.class);
}
@Override
protected synchronized void heartbeat() throws Exception {
}
};
allocator.init(conf);
allocator.start();
clock.setTime(5);
int timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(5, allocator.getLastHeartbeatTime());
clock.setTime(7);
timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(7, allocator.getLastHeartbeatTime());
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
allocator.runOnNextHeartbeat(new Runnable() {
@Override
public void run() {
callbackCalled.set(true);
}
});
clock.setTime(8);
timeToWaitMs = 5000;
while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals(8, allocator.getLastHeartbeatTime());
Assert.assertTrue(callbackCalled.get());
}
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -165,6 +166,11 @@ import org.junit.Test;
return allocator;
}
@Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@Override
protected void sysexit() {
}
@ -177,6 +183,7 @@ import org.junit.Test;
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
}
private final class MRAppTestCleanup extends MRApp {
@ -237,6 +244,11 @@ import org.junit.Test;
}
}
@Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@Override
public void cleanupStagingDir() throws IOException {
cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
@ -247,6 +259,20 @@ import org.junit.Test;
}
}
private static RMHeartbeatHandler getStubbedHeartbeatHandler(
final AppContext appContext) {
return new RMHeartbeatHandler() {
@Override
public long getLastHeartbeatTime() {
return appContext.getClock().getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
};
}
@Test
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.commit;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestCommitterEventHandler {
@Test
public void testCommitWindow() throws Exception {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
TestingJobEventHandler jeh = new TestingJobEventHandler();
dispatcher.register(JobEventType.class, jeh);
SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
OutputCommitter committer = mock(OutputCommitter.class);
TestingRMHeartbeatHandler rmhh =
new TestingRMHeartbeatHandler();
CommitterEventHandler ceh = new CommitterEventHandler(appContext,
committer, rmhh);
ceh.init(conf);
ceh.start();
// verify trying to commit when RM heartbeats are stale does not commit
ceh.handle(new CommitterJobCommitEvent(null, null));
long timeToWaitMs = 5000;
while (rmhh.getNumCallbacks() != 1 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not register a heartbeat callback",
1, rmhh.getNumCallbacks());
verify(committer, never()).commitJob(any(JobContext.class));
Assert.assertEquals("committer should not have committed",
0, jeh.numCommitCompletedEvents);
// set a fresh heartbeat and verify commit completes
rmhh.setLastHeartbeatTime(clock.getTime());
timeToWaitMs = 5000;
while (jeh.numCommitCompletedEvents != 1 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not complete commit after RM hearbeat",
1, jeh.numCommitCompletedEvents);
verify(committer, times(1)).commitJob(any(JobContext.class));
// try to commit again and verify it goes through since the heartbeat
// is still fresh
ceh.handle(new CommitterJobCommitEvent(null, null));
timeToWaitMs = 5000;
while (jeh.numCommitCompletedEvents != 2 && timeToWaitMs > 0) {
Thread.sleep(10);
timeToWaitMs -= 10;
}
Assert.assertEquals("committer did not commit",
2, jeh.numCommitCompletedEvents);
verify(committer, times(2)).commitJob(any(JobContext.class));
ceh.stop();
dispatcher.stop();
}
private static class TestingRMHeartbeatHandler
implements RMHeartbeatHandler {
private long lastHeartbeatTime = 0;
private ConcurrentLinkedQueue<Runnable> callbacks =
new ConcurrentLinkedQueue<Runnable>();
@Override
public long getLastHeartbeatTime() {
return lastHeartbeatTime;
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callbacks.add(callback);
}
public void setLastHeartbeatTime(long timestamp) {
lastHeartbeatTime = timestamp;
Runnable callback = null;
while ((callback = callbacks.poll()) != null) {
callback.run();
}
}
public int getNumCallbacks() {
return callbacks.size();
}
}
private static class TestingJobEventHandler
implements EventHandler<JobEvent> {
int numCommitCompletedEvents = 0;
@Override
public void handle(JobEvent event) {
if (event.getType() == JobEventType.JOB_COMMIT_COMPLETED) {
++numCommitCompletedEvents;
}
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -502,13 +503,23 @@ public class TestJobImpl {
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
SystemClock clock = new SystemClock();
final SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
@Override
public long getLastHeartbeatTime() {
return clock.getTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
callback.run();
}
};
CommitterEventHandler handler =
new CommitterEventHandler(appContext, committer);
new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler);
return handler;
}

View File

@ -470,6 +470,16 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
60 * 1000;
/**
* Defines a time window in milliseconds for output committer operations.
* If contact with the RM has occurred within this window then commit
* operations are allowed, otherwise the AM will not allow output committer
* operations until contact with the RM has been re-established.
*/
public static final String MR_AM_COMMIT_WINDOW_MS =
MR_AM_PREFIX + "job.committer.commit-window";
public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000;
/**
* Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that

View File

@ -1304,6 +1304,15 @@
committer to cancel an operation if the job is killed</description>
</property>
<property>
<name>yarn.app.mapreduce.am.job.committer.commit-window</name>
<value>10000</value>
<description>Defines a time window in milliseconds for output commit
operations. If contact with the RM has occurred within this window then
commits are allowed, otherwise the AM will not allow output commits until
contact with the RM has been re-established.</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>