YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun Vasudev

(cherry picked from commit 7438966586)
This commit is contained in:
Jian He 2015-05-19 14:20:31 -07:00
parent 7af9a78fe8
commit adb90c7f52
4 changed files with 187 additions and 5 deletions

View File

@ -375,6 +375,9 @@ Release 2.8.0 - UNRELEASED
YARN-3302. TestDockerContainerExecutor should run automatically if it can YARN-3302. TestDockerContainerExecutor should run automatically if it can
detect docker in the usual place (Ravindra Kumar Naik via raviprak) detect docker in the usual place (Ravindra Kumar Naik via raviprak)
YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted.
(Varun Vasudev via jianhe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -116,6 +116,11 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -30,10 +30,12 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -277,6 +279,10 @@ public class ApplicationMaster {
private final String linux_bash_command = "bash"; private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c"; private final String windows_command = "cmd /c";
@VisibleForTesting
protected final Set<ContainerId> launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
/** /**
* @param args Command line args * @param args Command line args
*/ */
@ -601,8 +607,12 @@ public class ApplicationMaster {
response.getContainersFromPreviousAttempts(); response.getContainersFromPreviousAttempts();
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ " previous attempts' running containers on AM registration."); + " previous attempts' running containers on AM registration.");
for(Container container: previousAMRunningContainers) {
launchedContainers.add(container.getId());
}
numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
int numTotalContainersToRequest = int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size(); numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM // Setup ask for containers from RM
@ -716,7 +726,8 @@ public class ApplicationMaster {
return success; return success;
} }
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { @VisibleForTesting
class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void onContainersCompleted(List<ContainerStatus> completedContainers) { public void onContainersCompleted(List<ContainerStatus> completedContainers) {
@ -731,6 +742,14 @@ public class ApplicationMaster {
// non complete containers should not be here // non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE); assert (containerStatus.getState() == ContainerState.COMPLETE);
// ignore containers we know nothing about - probably from a previous
// attempt
if (!launchedContainers.contains(containerStatus.getContainerId())) {
LOG.info("Ignoring completed status of "
+ containerStatus.getContainerId()
+ "; unknown container(probably launched by previous attempt)");
continue;
}
// increment counters for completed/failed containers // increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus(); int exitStatus = containerStatus.getExitStatus();
@ -796,14 +815,13 @@ public class ApplicationMaster {
// + ", containerToken" // + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString()); // +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = Thread launchThread = createLaunchContainerThread(allocatedContainer);
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep // launch and start the container on a separate thread to keep
// the main thread unblocked // the main thread unblocked
// as all containers may not be allocated at one go. // as all containers may not be allocated at one go.
launchThreads.add(launchThread); launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start(); launchThread.start();
} }
} }
@ -1150,4 +1168,30 @@ public class ApplicationMaster {
+ appAttemptId.toString(), e); + appAttemptId.toString(), e);
} }
} }
RMCallbackHandler getRMCallbackHandler() {
return new RMCallbackHandler();
}
@VisibleForTesting
void setAmRMClient(AMRMClientAsync client) {
this.amRMClient = client;
}
@VisibleForTesting
int getNumCompletedContainers() {
return numCompletedContainers.get();
}
@VisibleForTesting
boolean getDone() {
return done;
}
@VisibleForTesting
Thread createLaunchContainerThread(Container allocatedContainer) {
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
return new Thread(runnableLaunchContainer);
}
} }

View File

@ -20,13 +20,143 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A bunch of tests to make sure that the container allocations
* and releases occur correctly.
*/
public class TestDSAppMaster { public class TestDSAppMaster {
static class TestAppMaster extends ApplicationMaster {
private int threadsLaunched = 0;
@Override
protected Thread createLaunchContainerThread(Container allocatedContainer) {
threadsLaunched++;
launchedContainers.add(allocatedContainer.getId());
return new Thread();
}
void setNumTotalContainers(int numTotalContainers) {
this.numTotalContainers = numTotalContainers;
}
int getAllocatedContainers() {
return this.numAllocatedContainers.get();
}
@Override
void startTimelineClient(final Configuration conf) throws YarnException,
IOException, InterruptedException {
timelineClient = null;
}
}
@SuppressWarnings("unchecked")
@Test
public void testDSAppMasterAllocateHandler() throws Exception {
TestAppMaster master = new TestAppMaster();
int targetContainers = 2;
AMRMClientAsync mockClient = Mockito.mock(AMRMClientAsync.class);
master.setAmRMClient(mockClient);
master.setNumTotalContainers(targetContainers);
Mockito.doNothing().when(mockClient)
.addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler();
List<Container> containers = new ArrayList<>(1);
ContainerId id1 = BuilderUtils.newContainerId(1, 1, 1, 1);
containers.add(generateContainer(id1));
master.numRequestedContainers.set(targetContainers);
// first allocate a single container, everything should be fine
handler.onContainersAllocated(containers);
Assert.assertEquals("Wrong container allocation count", 1,
master.getAllocatedContainers());
Mockito.verifyZeroInteractions(mockClient);
Assert.assertEquals("Incorrect number of threads launched", 1,
master.threadsLaunched);
// now send 3 extra containers
containers.clear();
ContainerId id2 = BuilderUtils.newContainerId(1, 1, 1, 2);
containers.add(generateContainer(id2));
ContainerId id3 = BuilderUtils.newContainerId(1, 1, 1, 3);
containers.add(generateContainer(id3));
ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
containers.add(generateContainer(id4));
handler.onContainersAllocated(containers);
Assert.assertEquals("Wrong final container allocation count", 4,
master.getAllocatedContainers());
Assert.assertEquals("Incorrect number of threads launched", 4,
master.threadsLaunched);
// make sure we handle completion events correctly
List<ContainerStatus> status = new ArrayList<>();
status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));
status.add(generateContainerStatus(id2, ContainerExitStatus.SUCCESS));
status.add(generateContainerStatus(id3, ContainerExitStatus.ABORTED));
status.add(generateContainerStatus(id4, ContainerExitStatus.ABORTED));
handler.onContainersCompleted(status);
Assert.assertEquals("Unexpected number of completed containers",
targetContainers, master.getNumCompletedContainers());
Assert.assertTrue("Master didn't finish containers as expected",
master.getDone());
// test for events from containers we know nothing about
// these events should be ignored
status = new ArrayList<>();
ContainerId id5 = BuilderUtils.newContainerId(1, 1, 1, 5);
status.add(generateContainerStatus(id5, ContainerExitStatus.ABORTED));
Assert.assertEquals("Unexpected number of completed containers",
targetContainers, master.getNumCompletedContainers());
Assert.assertTrue("Master didn't finish containers as expected",
master.getDone());
status.add(generateContainerStatus(id5, ContainerExitStatus.SUCCESS));
Assert.assertEquals("Unexpected number of completed containers",
targetContainers, master.getNumCompletedContainers());
Assert.assertTrue("Master didn't finish containers as expected",
master.getDone());
}
private Container generateContainer(ContainerId cid) {
return Container.newInstance(cid, NodeId.newInstance("host", 5000),
"host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null);
}
private ContainerStatus
generateContainerStatus(ContainerId id, int exitStatus) {
return ContainerStatus.newInstance(id, ContainerState.COMPLETE, "",
exitStatus);
}
@Test @Test
public void testTimelineClientInDSAppMaster() throws Exception { public void testTimelineClientInDSAppMaster() throws Exception {
ApplicationMaster appMaster = new ApplicationMaster(); ApplicationMaster appMaster = new ApplicationMaster();