MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the allocate call. (Anubhav Dhoot via kasha)
(cherry picked from commit 8dfec7a197
)
This commit is contained in:
parent
2edfc3ce71
commit
20734320a1
|
@ -304,6 +304,9 @@ Release 2.7.2 - UNRELEASED
|
|||
MAPREDUCE-6426. TestShuffleHandler#testGetMapOutputInfo is failing.
|
||||
(zhihai xu via devaraj)
|
||||
|
||||
MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the
|
||||
allocate call. (Anubhav Dhoot via kasha)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -270,35 +270,38 @@ public abstract class RMCommunicator extends AbstractService
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
protected void startAllocatorThread() {
|
||||
allocatorThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
|
||||
@VisibleForTesting
|
||||
public class AllocatorRunnable implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Thread.sleep(rmPollInterval);
|
||||
try {
|
||||
Thread.sleep(rmPollInterval);
|
||||
try {
|
||||
heartbeat();
|
||||
} catch (YarnRuntimeException e) {
|
||||
LOG.error("Error communicating with RM: " + e.getMessage() , e);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("ERROR IN CONTACTING RM. ", e);
|
||||
continue;
|
||||
// TODO: for other exceptions
|
||||
}
|
||||
|
||||
lastHeartbeatTime = context.getClock().getTime();
|
||||
executeHeartbeatCallbacks();
|
||||
} catch (InterruptedException e) {
|
||||
if (!stopped.get()) {
|
||||
LOG.warn("Allocated thread interrupted. Returning.");
|
||||
}
|
||||
heartbeat();
|
||||
} catch (RMContainerAllocationException e) {
|
||||
LOG.error("Error communicating with RM: " + e.getMessage() , e);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("ERROR IN CONTACTING RM. ", e);
|
||||
continue;
|
||||
// TODO: for other exceptions
|
||||
}
|
||||
|
||||
lastHeartbeatTime = context.getClock().getTime();
|
||||
executeHeartbeatCallbacks();
|
||||
} catch (InterruptedException e) {
|
||||
if (!stopped.get()) {
|
||||
LOG.warn("Allocated thread interrupted. Returning.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected void startAllocatorThread() {
|
||||
allocatorThread = new Thread(new AllocatorRunnable());
|
||||
allocatorThread.setName("RMCommunicator Allocator");
|
||||
allocatorThread.start();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
/**
|
||||
* Exception to denote fatal failure in allocating containers from RM.
|
||||
*/
|
||||
public class RMContainerAllocationException extends Exception {
|
||||
private static final long serialVersionUID = 1L;
|
||||
public RMContainerAllocationException(Throwable cause) { super(cause); }
|
||||
public RMContainerAllocationException(String message) { super(message); }
|
||||
public RMContainerAllocationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -689,7 +689,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException(
|
||||
throw new RMContainerAllocationException(
|
||||
"Resource Manager doesn't recognize AttemptId: "
|
||||
+ this.getContext().getApplicationAttemptId(), e);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
|
@ -707,7 +707,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Could not contact RM after " +
|
||||
throw new RMContainerAllocationException("Could not contact RM after " +
|
||||
retryInterval + " milliseconds.");
|
||||
}
|
||||
// Throw this up to the caller, which may decide to ignore it and
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator.AllocatorRunnable;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestRMCommunicator {
|
||||
|
||||
class MockRMCommunicator extends RMCommunicator {
|
||||
public MockRMCommunicator(ClientService clientService, AppContext context) {
|
||||
super(clientService, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void heartbeat() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 2000)
|
||||
public void testRMContainerAllocatorExceptionIsHandled() throws Exception {
|
||||
ClientService mockClientService = mock(ClientService.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
MockRMCommunicator mockRMCommunicator =
|
||||
new MockRMCommunicator(mockClientService, mockContext);
|
||||
RMCommunicator communicator = spy(mockRMCommunicator);
|
||||
Clock mockClock = mock(Clock.class);
|
||||
when(mockContext.getClock()).thenReturn(mockClock);
|
||||
|
||||
doThrow(new RMContainerAllocationException("Test")).doNothing()
|
||||
.when(communicator).heartbeat();
|
||||
|
||||
when(mockClock.getTime()).thenReturn(1L).thenThrow(new AssertionError(
|
||||
"GetClock called second time, when it should not have since the " +
|
||||
"thread should have quit"));
|
||||
|
||||
AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
|
||||
testRunnable.run();
|
||||
}
|
||||
|
||||
@Test(timeout = 2000)
|
||||
public void testRMContainerAllocatorYarnRuntimeExceptionIsHandled()
|
||||
throws Exception {
|
||||
ClientService mockClientService = mock(ClientService.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
MockRMCommunicator mockRMCommunicator =
|
||||
new MockRMCommunicator(mockClientService, mockContext);
|
||||
final RMCommunicator communicator = spy(mockRMCommunicator);
|
||||
Clock mockClock = mock(Clock.class);
|
||||
when(mockContext.getClock()).thenReturn(mockClock);
|
||||
|
||||
doThrow(new YarnRuntimeException("Test")).doNothing()
|
||||
.when(communicator).heartbeat();
|
||||
|
||||
when(mockClock.getTime()).thenReturn(1L).thenAnswer(new Answer<Integer>() {
|
||||
@Override
|
||||
public Integer answer(InvocationOnMock invocation) throws Throwable {
|
||||
communicator.stop();
|
||||
return 2;
|
||||
}
|
||||
}).thenThrow(new AssertionError(
|
||||
"GetClock called second time, when it should not have since the thread " +
|
||||
"should have quit"));
|
||||
|
||||
AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
|
||||
testRunnable.run();
|
||||
|
||||
verify(mockClock, times(2)).getTime();
|
||||
}
|
||||
}
|
|
@ -1827,12 +1827,7 @@ public class TestRMContainerAllocator {
|
|||
}
|
||||
}, 100, 10000);
|
||||
// run the scheduler
|
||||
try {
|
||||
super.heartbeat();
|
||||
} catch (Exception e) {
|
||||
LOG.error("error in heartbeat ", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
super.heartbeat();
|
||||
|
||||
List<TaskAttemptContainerAssignedEvent> result
|
||||
= new ArrayList<TaskAttemptContainerAssignedEvent>(events);
|
||||
|
@ -1882,7 +1877,7 @@ public class TestRMContainerAllocator {
|
|||
@Override
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException,
|
||||
YarnException {
|
||||
throw new YarnRuntimeException("for testing");
|
||||
throw new IOException("for testing");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2445,7 +2440,7 @@ public class TestRMContainerAllocator {
|
|||
try {
|
||||
allocator.schedule();
|
||||
Assert.fail("Should Have Exception");
|
||||
} catch (YarnRuntimeException e) {
|
||||
} catch (RMContainerAllocationException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
|
||||
}
|
||||
dispatcher.await();
|
||||
|
@ -2666,6 +2661,43 @@ public class TestRMContainerAllocator {
|
|||
allocator.close();
|
||||
}
|
||||
|
||||
@Test(expected = RMContainerAllocationException.class)
|
||||
public void testAttemptNotFoundCausesRMCommunicatorException()
|
||||
throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||
.getDispatcher();
|
||||
|
||||
// Submit the application
|
||||
RMApp app = rm.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||
.getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
|
||||
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||
appAttemptId, mockJob);
|
||||
|
||||
// Now kill the application
|
||||
rm.killApp(app.getApplicationId());
|
||||
|
||||
allocator.schedule();
|
||||
}
|
||||
|
||||
private static class MockScheduler implements ApplicationMasterProtocol {
|
||||
ApplicationAttemptId attemptId;
|
||||
long nextContainerId = 10;
|
||||
|
|
Loading…
Reference in New Issue