MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the allocate call. (Anubhav Dhoot via kasha)

(cherry picked from commit 8dfec7a197)
(cherry picked from commit 20734320a1)
This commit is contained in:
Karthik Kambatla 2015-08-15 00:52:11 -07:00
parent 4d94bc1d95
commit d610144d45
6 changed files with 202 additions and 34 deletions

View File

@ -18,6 +18,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6426. TestShuffleHandler#testGetMapOutputInfo is failing.
(zhihai xu via devaraj)
MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the
allocate call. (Anubhav Dhoot via kasha)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -270,8 +270,8 @@ public abstract class RMCommunicator extends AbstractService
super.serviceStop();
}
protected void startAllocatorThread() {
allocatorThread = new Thread(new Runnable() {
@VisibleForTesting
public class AllocatorRunnable implements Runnable {
@Override
public void run() {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
@ -279,7 +279,7 @@ public abstract class RMCommunicator extends AbstractService
Thread.sleep(rmPollInterval);
try {
heartbeat();
} catch (YarnRuntimeException e) {
} catch (RMContainerAllocationException e) {
LOG.error("Error communicating with RM: " + e.getMessage() , e);
return;
} catch (Exception e) {
@ -298,7 +298,10 @@ public abstract class RMCommunicator extends AbstractService
}
}
}
});
}
protected void startAllocatorThread() {
allocatorThread = new Thread(new AllocatorRunnable());
allocatorThread.setName("RMCommunicator Allocator");
allocatorThread.start();
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
/**
* Exception to denote fatal failure in allocating containers from RM.
*/
public class RMContainerAllocationException extends Exception {
private static final long serialVersionUID = 1L;
public RMContainerAllocationException(Throwable cause) { super(cause); }
public RMContainerAllocationException(String message) { super(message); }
public RMContainerAllocationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -681,7 +681,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException(
throw new RMContainerAllocationException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationAttemptId(), e);
} catch (ApplicationMasterNotRegisteredException e) {
@ -699,7 +699,7 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Could not contact RM after " +
throw new RMContainerAllocationException("Could not contact RM after " +
retryInterval + " milliseconds.");
}
// Throw this up to the caller, which may decide to ignore it and

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator.AllocatorRunnable;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestRMCommunicator {
class MockRMCommunicator extends RMCommunicator {
public MockRMCommunicator(ClientService clientService, AppContext context) {
super(clientService, context);
}
@Override
protected void heartbeat() throws Exception {
}
}
@Test(timeout = 2000)
public void testRMContainerAllocatorExceptionIsHandled() throws Exception {
ClientService mockClientService = mock(ClientService.class);
AppContext mockContext = mock(AppContext.class);
MockRMCommunicator mockRMCommunicator =
new MockRMCommunicator(mockClientService, mockContext);
RMCommunicator communicator = spy(mockRMCommunicator);
Clock mockClock = mock(Clock.class);
when(mockContext.getClock()).thenReturn(mockClock);
doThrow(new RMContainerAllocationException("Test")).doNothing()
.when(communicator).heartbeat();
when(mockClock.getTime()).thenReturn(1L).thenThrow(new AssertionError(
"GetClock called second time, when it should not have since the " +
"thread should have quit"));
AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
testRunnable.run();
}
@Test(timeout = 2000)
public void testRMContainerAllocatorYarnRuntimeExceptionIsHandled()
throws Exception {
ClientService mockClientService = mock(ClientService.class);
AppContext mockContext = mock(AppContext.class);
MockRMCommunicator mockRMCommunicator =
new MockRMCommunicator(mockClientService, mockContext);
final RMCommunicator communicator = spy(mockRMCommunicator);
Clock mockClock = mock(Clock.class);
when(mockContext.getClock()).thenReturn(mockClock);
doThrow(new YarnRuntimeException("Test")).doNothing()
.when(communicator).heartbeat();
when(mockClock.getTime()).thenReturn(1L).thenAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
communicator.stop();
return 2;
}
}).thenThrow(new AssertionError(
"GetClock called second time, when it should not have since the thread " +
"should have quit"));
AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
testRunnable.run();
verify(mockClock, times(2)).getTime();
}
}

View File

@ -1740,12 +1740,7 @@ public class TestRMContainerAllocator {
}
}, 100, 10000);
// run the scheduler
try {
super.heartbeat();
} catch (Exception e) {
LOG.error("error in heartbeat ", e);
throw new YarnRuntimeException(e);
}
List<TaskAttemptContainerAssignedEvent> result
= new ArrayList<TaskAttemptContainerAssignedEvent>(events);
@ -1795,7 +1790,7 @@ public class TestRMContainerAllocator {
@Override
protected AllocateResponse makeRemoteRequest() throws IOException,
YarnException {
throw new YarnRuntimeException("for testing");
throw new IOException("for testing");
}
}
@ -2358,7 +2353,7 @@ public class TestRMContainerAllocator {
try {
allocator.schedule();
Assert.fail("Should Have Exception");
} catch (YarnRuntimeException e) {
} catch (RMContainerAllocationException e) {
Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
}
dispatcher.await();
@ -2579,6 +2574,43 @@ public class TestRMContainerAllocator {
allocator.close();
}
@Test(expected = RMContainerAllocationException.class)
public void testAttemptNotFoundCausesRMCommunicatorException()
throws Exception {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// Now kill the application
rm.killApp(app.getApplicationId());
allocator.schedule();
}
private static class MockScheduler implements ApplicationMasterProtocol {
ApplicationAttemptId attemptId;
long nextContainerId = 10;