diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f725f0c2427..a4c59dd9a31 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -659,6 +659,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when running jobs in local mode (Devaraj K via bobby) + MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index e9f890a7f00..f32b5d59b7c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -73,6 +74,8 @@ public class TaskAttemptListenerImpl extends CompositeService private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; + private RMHeartbeatHandler rmHeartbeatHandler; + private long commitWindowMs; private InetSocketAddress address; private ConcurrentMap jvmIDToActiveAttemptMap @@ -83,15 +86,19 @@ public class TaskAttemptListenerImpl extends CompositeService private JobTokenSecretManager jobTokenSecretManager = null; public TaskAttemptListenerImpl(AppContext context, - JobTokenSecretManager jobTokenSecretManager) { + JobTokenSecretManager jobTokenSecretManager, + RMHeartbeatHandler rmHeartbeatHandler) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; + this.rmHeartbeatHandler = rmHeartbeatHandler; } @Override public void init(Configuration conf) { registerHeartbeatHandler(conf); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); super.init(conf); } @@ -172,6 +179,13 @@ public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException { taskHeartbeatHandler.progressing(attemptID); + // tell task to retry later if AM has not heard from RM within the commit + // window to help avoid double-committing in a split-brain situation + long now = context.getClock().getTime(); + if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) { + return false; + } + Job job = context.getJob(attemptID.getTaskId().getJobId()); Task task = job.getTask(attemptID.getTaskId()); return task.canCommit(attemptID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 64d3fded251..db0941e7330 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -87,6 +87,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -264,18 +265,20 @@ public void init(final Configuration conf) { addIfService(dispatcher); } + //service to handle requests from JobClient + clientService = createClientService(context); + addIfService(clientService); + + containerAllocator = createContainerAllocator(clientService, context); + //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context); addIfService(taskAttemptListener); - //service to do the task cleanup + //service to handle the output committer committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); - //service to handle requests from JobClient - clientService = createClientService(context); - addIfService(clientService); - //service to log job history events EventHandler historyService = createJobHistoryHandler(context); @@ -303,7 +306,6 @@ public void init(final Configuration conf) { speculatorEventDispatcher); // service to allocate containers from RM (if non-uber) or to fake it (uber) - containerAllocator = createContainerAllocator(clientService, context); addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); @@ -582,13 +584,15 @@ protected Speculator createSpeculator(Configuration conf, AppContext context) { protected TaskAttemptListener createTaskAttemptListener(AppContext context) { TaskAttemptListener lis = - new TaskAttemptListenerImpl(context, jobTokenSecretManager); + new TaskAttemptListenerImpl(context, jobTokenSecretManager, + getRMHeartbeatHandler()); return lis; } protected EventHandler createCommitterEventHandler( AppContext context, OutputCommitter committer) { - return new CommitterEventHandler(context, committer); + return new CommitterEventHandler(context, committer, + getRMHeartbeatHandler()); } protected ContainerAllocator createContainerAllocator( @@ -596,6 +600,10 @@ protected ContainerAllocator createContainerAllocator( return new ContainerAllocatorRouter(clientService, context); } + protected RMHeartbeatHandler getRMHeartbeatHandler() { + return (RMHeartbeatHandler) containerAllocator; + } + protected ContainerLauncher createContainerLauncher(final AppContext context) { return new ContainerLauncherRouter(context); @@ -663,7 +671,7 @@ public TaskAttemptListener getTaskAttemptListener() { * happened. */ private final class ContainerAllocatorRouter extends AbstractService - implements ContainerAllocator { + implements ContainerAllocator, RMHeartbeatHandler { private final ClientService clientService; private final AppContext context; private ContainerAllocator containerAllocator; @@ -708,6 +716,16 @@ public void setSignalled(boolean isSignalled) { public void setShouldUnregister(boolean shouldUnregister) { ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister); } + + @Override + public long getLastHeartbeatTime() { + return ((RMCommunicator) containerAllocator).getLastHeartbeatTime(); + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback); + } } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index 462a434ad76..0b0c5bd78c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.event.EventHandler; @@ -54,6 +55,7 @@ public class CommitterEventHandler extends AbstractService private final AppContext context; private final OutputCommitter committer; + private final RMHeartbeatHandler rmHeartbeatHandler; private ThreadPoolExecutor launcherPool; private Thread eventHandlingThread; private BlockingQueue eventQueue = @@ -61,11 +63,14 @@ public class CommitterEventHandler extends AbstractService private final AtomicBoolean stopped; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; + private long commitWindowMs; - public CommitterEventHandler(AppContext context, OutputCommitter committer) { + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler) { super("CommitterEventHandler"); this.context = context; this.committer = committer; + this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); } @@ -75,6 +80,8 @@ public void init(Configuration conf) { commitThreadCancelTimeoutMs = conf.getInt( MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); } @Override @@ -210,6 +217,7 @@ protected void handleJobSetup(CommitterJobSetupEvent event) { protected void handleJobCommit(CommitterJobCommitEvent event) { try { jobCommitStarted(); + waitForValidCommitWindow(); committer.commitJob(event.getJobContext()); context.getEventHandler().handle( new JobCommitCompletedEvent(event.getJobID())); @@ -248,5 +256,26 @@ protected void handleTaskAbort(CommitterTaskAbortEvent event) { new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE)); } + + private synchronized void waitForValidCommitWindow() + throws InterruptedException { + long lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime(); + long now = context.getClock().getTime(); + + while (now - lastHeartbeatTime > commitWindowMs) { + rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() { + @Override + public void run() { + synchronized (EventProcessor.this) { + EventProcessor.this.notify(); + } + } + }); + + wait(); + lastHeartbeatTime = rmHeartbeatHandler.getLastHeartbeatTime(); + now = context.getClock().getTime(); + } + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 9f594c09e86..1c6d6b8e00e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -62,7 +63,8 @@ /** * Registers/unregisters to RM and sends heartbeats to RM. */ -public abstract class RMCommunicator extends AbstractService { +public abstract class RMCommunicator extends AbstractService + implements RMHeartbeatHandler { private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private int rmPollInterval;//millis protected ApplicationId applicationId; @@ -77,6 +79,8 @@ public abstract class RMCommunicator extends AbstractService { private Resource minContainerCapability; private Resource maxContainerCapability; protected Map applicationACLs; + private volatile long lastHeartbeatTime; + private ConcurrentLinkedQueue heartbeatCallbacks; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -95,6 +99,7 @@ public RMCommunicator(ClientService clientService, AppContext context) { this.applicationId = context.getApplicationID(); this.applicationAttemptId = context.getApplicationAttemptId(); this.stopped = new AtomicBoolean(false); + this.heartbeatCallbacks = new ConcurrentLinkedQueue(); } @Override @@ -236,8 +241,12 @@ public void run() { return; } catch (Exception e) { LOG.error("ERROR IN CONTACTING RM. ", e); + continue; // TODO: for other exceptions } + + lastHeartbeatTime = context.getClock().getTime(); + executeHeartbeatCallbacks(); } catch (InterruptedException e) { if (!stopped.get()) { LOG.warn("Allocated thread interrupted. Returning."); @@ -295,6 +304,23 @@ public AMRMProtocol run() { protected abstract void heartbeat() throws Exception; + private void executeHeartbeatCallbacks() { + Runnable callback = null; + while ((callback = heartbeatCallbacks.poll()) != null) { + callback.run(); + } + } + + @Override + public long getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + heartbeatCallbacks.add(callback); + } + public void setShouldUnregister(boolean shouldUnregister) { this.shouldUnregister = shouldUnregister; LOG.info("RMCommunicator notified that shouldUnregistered is: " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java new file mode 100644 index 00000000000..ed0d4a6760e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMHeartbeatHandler.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +public interface RMHeartbeatHandler { + long getLastHeartbeatTime(); + + void runOnNextHeartbeat(Runnable callback); +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index dc623e5b153..dfeed7f3f49 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,7 +43,9 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; @@ -51,8 +55,9 @@ public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, + RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager); + super(context, jobTokenSecretManager, rmHeartbeatHandler); this.taskHeartbeatHandler = hbHandler; } @@ -76,9 +81,12 @@ protected void stopRpcServer() { public void testGetTask() throws IOException { AppContext appCtx = mock(AppContext.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptListenerImpl listener = - new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler); + new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler); Configuration conf = new Configuration(); listener.init(conf); listener.start(); @@ -152,9 +160,11 @@ public void testGetMapCompletionEvents() throws IOException { AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -191,4 +201,46 @@ private static TaskAttemptCompletionEvent createTce(int eventId, return tce; } + @Test + public void testCommitWindow() throws IOException { + SystemClock clock = new SystemClock(); + + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = + mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); + when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); + Job mockJob = mock(Job.class); + when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); + AppContext appCtx = mock(AppContext.class); + when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); + when(appCtx.getClock()).thenReturn(clock); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + @Override + protected void registerHeartbeatHandler(Configuration conf) { + taskHeartbeatHandler = hbHandler; + } + }; + + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + + // verify commit not allowed when RM heartbeat has not occurred recently + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0); + boolean canCommit = listener.canCommit(tid); + assertFalse(canCommit); + verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); + + // verify commit allowed when RM heartbeat is recent + when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); + canCommit = listener.canCommit(tid); + assertTrue(canCommit); + verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); + + listener.stop(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index b3159a60ad3..3d73ef24628 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; @@ -489,7 +490,8 @@ protected ContainerAllocator createContainerAllocator( return new MRAppContainerAllocator(); } - protected class MRAppContainerAllocator implements ContainerAllocator { + protected class MRAppContainerAllocator + implements ContainerAllocator, RMHeartbeatHandler { private int containerCount; @Override @@ -514,6 +516,16 @@ public void handle(ContainerAllocatorEvent event) { new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); } + + @Override + public long getLastHeartbeatTime() { + return getContext().getClock().getTime(); + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } } @Override @@ -566,7 +578,8 @@ public void recoverTask(TaskAttemptContext taskContext) } }; - return new CommitterEventHandler(context, stubbedCommitter); + return new CommitterEventHandler(context, stubbedCommitter, + getRMHeartbeatHandler()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 3af570823ae..8248a27df44 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -252,7 +252,7 @@ protected TaskAttemptListener createTaskAttemptListener(AppContext context) { //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null) { + return new TaskAttemptListenerImpl(getContext(), null, null) { @Override public void startRpcServer(){}; @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index cd8e1c5de35..0e50146465f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -70,7 +71,9 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; +import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -1240,6 +1243,13 @@ public void handle(Event event) { return context; } + private static AppContext createAppContext( + ApplicationAttemptId appAttemptId, Job job, Clock clock) { + AppContext context = createAppContext(appAttemptId, job); + when(context.getClock()).thenReturn(clock); + return context; + } + private static ClientService createMockClientService() { ClientService service = mock(ClientService.class); when(service.getBindAddress()).thenReturn( @@ -1264,6 +1274,15 @@ public MyContainerAllocator(MyResourceManager rm, Configuration conf, super.start(); } + public MyContainerAllocator(MyResourceManager rm, Configuration conf, + ApplicationAttemptId appAttemptId, Job job, Clock clock) { + super(createMockClientService(), + createAppContext(appAttemptId, job, clock)); + this.rm = rm; + super.init(conf); + super.start(); + } + @Override protected AMRMProtocol createSchedulerProxy() { return this.rm.getApplicationMasterService(); @@ -1465,6 +1484,66 @@ public void testCompletedTasksRecalculateSchedule() throws Exception { allocator.recalculatedReduceSchedule); } + @Test + public void testHeartbeatHandler() throws Exception { + LOG.info("Running testHeartbeatHandler"); + + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); + ControlledClock clock = new ControlledClock(new SystemClock()); + AppContext appContext = mock(AppContext.class); + when(appContext.getClock()).thenReturn(clock); + when(appContext.getApplicationID()).thenReturn( + BuilderUtils.newApplicationId(1, 1)); + + RMContainerAllocator allocator = new RMContainerAllocator( + mock(ClientService.class), appContext) { + @Override + protected void register() { + } + @Override + protected AMRMProtocol createSchedulerProxy() { + return mock(AMRMProtocol.class); + } + @Override + protected synchronized void heartbeat() throws Exception { + } + }; + allocator.init(conf); + allocator.start(); + + clock.setTime(5); + int timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(5, allocator.getLastHeartbeatTime()); + clock.setTime(7); + timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(7, allocator.getLastHeartbeatTime()); + + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + allocator.runOnNextHeartbeat(new Runnable() { + @Override + public void run() { + callbackCalled.set(true); + } + }); + clock.setTime(8); + timeToWaitMs = 5000; + while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals(8, allocator.getLastHeartbeatTime()); + Assert.assertTrue(callbackCalled.get()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 27b9fd20e4e..45215c93a98 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -165,6 +166,11 @@ protected ContainerAllocator createContainerAllocator( return allocator; } + @Override + public RMHeartbeatHandler getRMHeartbeatHandler() { + return getStubbedHeartbeatHandler(getContext()); + } + @Override protected void sysexit() { } @@ -177,6 +183,7 @@ public Configuration getConfig() { @Override protected void downloadTokensAndSetupUGI(Configuration conf) { } + } private final class MRAppTestCleanup extends MRApp { @@ -237,6 +244,11 @@ public synchronized void stop() { } } + @Override + public RMHeartbeatHandler getRMHeartbeatHandler() { + return getStubbedHeartbeatHandler(getContext()); + } + @Override public void cleanupStagingDir() throws IOException { cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; @@ -247,6 +259,20 @@ protected void sysexit() { } } + private static RMHeartbeatHandler getStubbedHeartbeatHandler( + final AppContext appContext) { + return new RMHeartbeatHandler() { + @Override + public long getLastHeartbeatTime() { + return appContext.getClock().getTime(); + } + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } + }; + } + @Test public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java new file mode 100644 index 00000000000..acbbe52bd1a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.commit; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Test; + +public class TestCommitterEventHandler { + + @Test + public void testCommitWindow() throws Exception { + Configuration conf = new Configuration(); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + + TestingJobEventHandler jeh = new TestingJobEventHandler(); + dispatcher.register(JobEventType.class, jeh); + + SystemClock clock = new SystemClock(); + AppContext appContext = mock(AppContext.class); + when(appContext.getEventHandler()).thenReturn( + dispatcher.getEventHandler()); + when(appContext.getClock()).thenReturn(clock); + OutputCommitter committer = mock(OutputCommitter.class); + TestingRMHeartbeatHandler rmhh = + new TestingRMHeartbeatHandler(); + + CommitterEventHandler ceh = new CommitterEventHandler(appContext, + committer, rmhh); + ceh.init(conf); + ceh.start(); + + // verify trying to commit when RM heartbeats are stale does not commit + ceh.handle(new CommitterJobCommitEvent(null, null)); + long timeToWaitMs = 5000; + while (rmhh.getNumCallbacks() != 1 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals("committer did not register a heartbeat callback", + 1, rmhh.getNumCallbacks()); + verify(committer, never()).commitJob(any(JobContext.class)); + Assert.assertEquals("committer should not have committed", + 0, jeh.numCommitCompletedEvents); + + // set a fresh heartbeat and verify commit completes + rmhh.setLastHeartbeatTime(clock.getTime()); + timeToWaitMs = 5000; + while (jeh.numCommitCompletedEvents != 1 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals("committer did not complete commit after RM hearbeat", + 1, jeh.numCommitCompletedEvents); + verify(committer, times(1)).commitJob(any(JobContext.class)); + + // try to commit again and verify it goes through since the heartbeat + // is still fresh + ceh.handle(new CommitterJobCommitEvent(null, null)); + timeToWaitMs = 5000; + while (jeh.numCommitCompletedEvents != 2 && timeToWaitMs > 0) { + Thread.sleep(10); + timeToWaitMs -= 10; + } + Assert.assertEquals("committer did not commit", + 2, jeh.numCommitCompletedEvents); + verify(committer, times(2)).commitJob(any(JobContext.class)); + + ceh.stop(); + dispatcher.stop(); + } + + private static class TestingRMHeartbeatHandler + implements RMHeartbeatHandler { + private long lastHeartbeatTime = 0; + private ConcurrentLinkedQueue callbacks = + new ConcurrentLinkedQueue(); + + @Override + public long getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + @Override + public void runOnNextHeartbeat(Runnable callback) { + callbacks.add(callback); + } + + public void setLastHeartbeatTime(long timestamp) { + lastHeartbeatTime = timestamp; + Runnable callback = null; + while ((callback = callbacks.poll()) != null) { + callback.run(); + } + } + + public int getNumCallbacks() { + return callbacks.size(); + } + } + + private static class TestingJobEventHandler + implements EventHandler { + int numCommitCompletedEvents = 0; + + @Override + public void handle(JobEvent event) { + if (event.getType() == JobEventType.JOB_COMMIT_COMPLETED) { + ++numCommitCompletedEvents; + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index f5187dcdc98..7d8101079ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -502,13 +503,23 @@ public void testTransitionsAtFailed() throws IOException { private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { - SystemClock clock = new SystemClock(); + final SystemClock clock = new SystemClock(); AppContext appContext = mock(AppContext.class); when(appContext.getEventHandler()).thenReturn( dispatcher.getEventHandler()); when(appContext.getClock()).thenReturn(clock); + RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() { + @Override + public long getLastHeartbeatTime() { + return clock.getTime(); + } + @Override + public void runOnNextHeartbeat(Runnable callback) { + callback.run(); + } + }; CommitterEventHandler handler = - new CommitterEventHandler(appContext, committer); + new CommitterEventHandler(appContext, committer, heartbeatHandler); dispatcher.register(CommitterEventType.class, handler); return handler; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 099cf8f530b..71ab51b29fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -473,6 +473,16 @@ public interface MRJobConfig { public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS = 60 * 1000; + /** + * Defines a time window in milliseconds for output committer operations. + * If contact with the RM has occurred within this window then commit + * operations are allowed, otherwise the AM will not allow output committer + * operations until contact with the RM has been re-established. + */ + public static final String MR_AM_COMMIT_WINDOW_MS = + MR_AM_PREFIX + "job.committer.commit-window"; + public static final int DEFAULT_MR_AM_COMMIT_WINDOW_MS = 10 * 1000; + /** * Boolean. Create the base dirs in the JobHistoryEventHandler * Set to false for multi-user clusters. This is an internal config that diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 7a586525774..fa17d875069 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -880,6 +880,15 @@ committer to cancel an operation if the job is killed + + yarn.app.mapreduce.am.job.committer.commit-window + 10000 + Defines a time window in milliseconds for output commit + operations. If contact with the RM has occurred within this window then + commits are allowed, otherwise the AM will not allow output commits until + contact with the RM has been re-established. + + yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms 1000