Merge r1502914 from trunk to branch-2 for YARN-763. AMRMClientAsync should stop heartbeating after receiving shutdown from RM (Xuan Gong via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1502916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-13 23:38:27 +00:00
parent c1b0106711
commit 469e65c3aa
3 changed files with 112 additions and 18 deletions

View File

@ -669,6 +669,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-541. getAllocatedContainers() is not returning all the allocated
containers (bikas)
YARN-763. AMRMClientAsync should stop heartbeating after receiving
shutdown from RM (Xuan Gong via bikas)
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -92,6 +93,7 @@ extends AMRMClientAsync<T> {
@Override
protected void serviceStart() throws Exception {
handlerThread.setDaemon(true);
handlerThread.start();
client.start();
super.serviceStart();
@ -99,27 +101,19 @@ extends AMRMClientAsync<T> {
/**
* Tells the heartbeat and handler threads to stop and waits for them to
* terminate. Calling this method from the callback handler thread would cause
* deadlock, and thus should be avoided.
* terminate.
*/
@Override
protected void serviceStop() throws Exception {
if (Thread.currentThread() == handlerThread) {
throw new YarnRuntimeException("Cannot call stop from callback handler thread!");
}
keepRunning = false;
heartbeatThread.interrupt();
try {
heartbeatThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with heartbeat thread", ex);
}
client.stop();
try {
handlerThread.interrupt();
handlerThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with hander thread", ex);
}
handlerThread.interrupt();
super.serviceStop();
}
@ -248,6 +242,10 @@ extends AMRMClientAsync<T> {
while (true) {
try {
responseQueue.put(response);
if (response.getAMCommand() == AMCommand.AM_RESYNC
|| response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
return;
}
break;
} catch (InterruptedException ex) {
LOG.info("Interrupted while waiting to put on response queue", ex);
@ -285,24 +283,18 @@ extends AMRMClientAsync<T> {
}
if (response.getAMCommand() != null) {
boolean stop = false;
switch(response.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
stop = true;
break;
return;
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
if(stop) {
// should probably stop heartbeating also YARN-763
break;
}
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {

View File

@ -23,7 +23,10 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -218,6 +221,65 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
@Test (timeout = 10000)
public void testAMRMClientAsyncShutDown() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse shutDownResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
Thread.sleep(50);
verify(client, times(1)).allocate(anyFloat());
asyncClient.stop();
}
@Test (timeout = 5000)
public void testCallAMRMClientAsyncStopFromCallbackHandler()
throws YarnException, IOException, InterruptedException {
Configuration conf = new Configuration();
TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
List<ContainerStatus> completed = Arrays.asList(
ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
ContainerState.COMPLETE, "", 0));
final AllocateResponse response = createAllocateResponse(completed,
new ArrayList<Container>(), null);
when(client.allocate(anyFloat())).thenReturn(response);
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
callbackHandler.registerAsyncClient(asyncClient);
asyncClient.init(conf);
asyncClient.start();
synchronized (callbackHandler.notifier) {
asyncClient.registerApplicationMaster("localhost", 1234, null);
while(callbackHandler.stop == false) {
try {
callbackHandler.notifier.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated,
List<NMToken> nmTokens) {
@ -323,4 +385,41 @@ public class TestAMRMClientAsync {
}
}
}
private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler {
Object notifier = new Object();
@SuppressWarnings("rawtypes")
AMRMClientAsync asynClient;
boolean stop = false;
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {}
@Override
public void onContainersAllocated(List<Container> containers) {}
@Override
public void onShutdownRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public float getProgress() {
asynClient.stop();
stop = true;
synchronized (notifier) {
notifier.notifyAll();
}
return 0;
}
@Override
public void onError(Exception e) {}
public void registerAsyncClient(
AMRMClientAsync<ContainerRequest> asyncClient) {
this.asynClient = asyncClient;
}
}
}