From 8dfec7a1979e8f70f8355c096874921d368342ef Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Sat, 15 Aug 2015 00:52:11 -0700 Subject: [PATCH] MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the allocate call. (Anubhav Dhoot via kasha) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/rm/RMCommunicator.java | 51 +++++----- .../rm/RMContainerAllocationException.java | 31 ++++++ .../v2/app/rm/RMContainerAllocator.java | 4 +- .../v2/app/rm/TestRMCommunicator.java | 99 +++++++++++++++++++ .../v2/app/rm/TestRMContainerAllocator.java | 48 +++++++-- 6 files changed, 202 insertions(+), 34 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocationException.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMCommunicator.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 19bd697dd4e..d2eef32cc8b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -573,6 +573,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6426. TestShuffleHandler#testGetMapOutputInfo is failing. (zhihai xu via devaraj) + MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the + allocate call. (Anubhav Dhoot via kasha) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 5d4fa12ead2..6cec2f3abfc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -270,35 +270,38 @@ public abstract class RMCommunicator extends AbstractService super.serviceStop(); } - protected void startAllocatorThread() { - allocatorThread = new Thread(new Runnable() { - @Override - public void run() { - while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + @VisibleForTesting + public class AllocatorRunnable implements Runnable { + @Override + public void run() { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(rmPollInterval); try { - Thread.sleep(rmPollInterval); - try { - heartbeat(); - } catch (YarnRuntimeException e) { - LOG.error("Error communicating with RM: " + e.getMessage() , e); - return; - } catch (Exception e) { - LOG.error("ERROR IN CONTACTING RM. ", e); - continue; - // TODO: for other exceptions - } - - lastHeartbeatTime = context.getClock().getTime(); - executeHeartbeatCallbacks(); - } catch (InterruptedException e) { - if (!stopped.get()) { - LOG.warn("Allocated thread interrupted. Returning."); - } + heartbeat(); + } catch (RMContainerAllocationException e) { + LOG.error("Error communicating with RM: " + e.getMessage() , e); return; + } catch (Exception e) { + LOG.error("ERROR IN CONTACTING RM. ", e); + continue; + // TODO: for other exceptions } + + lastHeartbeatTime = context.getClock().getTime(); + executeHeartbeatCallbacks(); + } catch (InterruptedException e) { + if (!stopped.get()) { + LOG.warn("Allocated thread interrupted. Returning."); + } + return; } } - }); + } + } + + protected void startAllocatorThread() { + allocatorThread = new Thread(new AllocatorRunnable()); allocatorThread.setName("RMCommunicator Allocator"); allocatorThread.start(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocationException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocationException.java new file mode 100644 index 00000000000..3c10944497d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocationException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +/** + * Exception to denote fatal failure in allocating containers from RM. + */ +public class RMContainerAllocationException extends Exception { + private static final long serialVersionUID = 1L; + public RMContainerAllocationException(Throwable cause) { super(cause); } + public RMContainerAllocationException(String message) { super(message); } + public RMContainerAllocationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1aeee2c9865..ac4c586bcc0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -699,7 +699,7 @@ public class RMContainerAllocator extends RMContainerRequestor // this application must clean itself up. eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException( + throw new RMContainerAllocationException( "Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationAttemptId(), e); } catch (ApplicationMasterNotRegisteredException e) { @@ -717,7 +717,7 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Could not contact RM after " + + throw new RMContainerAllocationException("Could not contact RM after " + retryInterval + " milliseconds."); } // Throw this up to the caller, which may decide to ignore it and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMCommunicator.java new file mode 100644 index 00000000000..a7c63c27c09 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMCommunicator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator.AllocatorRunnable; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestRMCommunicator { + + class MockRMCommunicator extends RMCommunicator { + public MockRMCommunicator(ClientService clientService, AppContext context) { + super(clientService, context); + } + + @Override + protected void heartbeat() throws Exception { + } + } + + @Test(timeout = 2000) + public void testRMContainerAllocatorExceptionIsHandled() throws Exception { + ClientService mockClientService = mock(ClientService.class); + AppContext mockContext = mock(AppContext.class); + MockRMCommunicator mockRMCommunicator = + new MockRMCommunicator(mockClientService, mockContext); + RMCommunicator communicator = spy(mockRMCommunicator); + Clock mockClock = mock(Clock.class); + when(mockContext.getClock()).thenReturn(mockClock); + + doThrow(new RMContainerAllocationException("Test")).doNothing() + .when(communicator).heartbeat(); + + when(mockClock.getTime()).thenReturn(1L).thenThrow(new AssertionError( + "GetClock called second time, when it should not have since the " + + "thread should have quit")); + + AllocatorRunnable testRunnable = communicator.new AllocatorRunnable(); + testRunnable.run(); + } + + @Test(timeout = 2000) + public void testRMContainerAllocatorYarnRuntimeExceptionIsHandled() + throws Exception { + ClientService mockClientService = mock(ClientService.class); + AppContext mockContext = mock(AppContext.class); + MockRMCommunicator mockRMCommunicator = + new MockRMCommunicator(mockClientService, mockContext); + final RMCommunicator communicator = spy(mockRMCommunicator); + Clock mockClock = mock(Clock.class); + when(mockContext.getClock()).thenReturn(mockClock); + + doThrow(new YarnRuntimeException("Test")).doNothing() + .when(communicator).heartbeat(); + + when(mockClock.getTime()).thenReturn(1L).thenAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + communicator.stop(); + return 2; + } + }).thenThrow(new AssertionError( + "GetClock called second time, when it should not have since the thread " + + "should have quit")); + + AllocatorRunnable testRunnable = communicator.new AllocatorRunnable(); + testRunnable.run(); + + verify(mockClock, times(2)).getTime(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index a096a68fd22..e148c32f1b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1830,12 +1830,7 @@ public class TestRMContainerAllocator { } }, 100, 10000); // run the scheduler - try { - super.heartbeat(); - } catch (Exception e) { - LOG.error("error in heartbeat ", e); - throw new YarnRuntimeException(e); - } + super.heartbeat(); List result = new ArrayList(events); @@ -1885,7 +1880,7 @@ public class TestRMContainerAllocator { @Override protected AllocateResponse makeRemoteRequest() throws IOException, YarnException { - throw new YarnRuntimeException("for testing"); + throw new IOException("for testing"); } } @@ -2450,7 +2445,7 @@ public class TestRMContainerAllocator { try { allocator.schedule(); Assert.fail("Should Have Exception"); - } catch (YarnRuntimeException e) { + } catch (RMContainerAllocationException e) { Assert.assertTrue(e.getMessage().contains("Could not contact RM after")); } dispatcher.await(); @@ -2671,6 +2666,43 @@ public class TestRMContainerAllocator { allocator.close(); } + @Test(expected = RMContainerAllocationException.class) + public void testAttemptNotFoundCausesRMCommunicatorException() + throws Exception { + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // Now kill the application + rm.killApp(app.getApplicationId()); + + allocator.schedule(); + } + private static class MockScheduler implements ApplicationMasterProtocol { ApplicationAttemptId attemptId; long nextContainerId = 10;