YARN-2674. Fix distributed shell AM container relaunch during RM work preserving restart. Contributed by Shane Kumpf

This commit is contained in:
Billie Rinaldi 2018-04-30 14:34:51 -07:00
parent d6139c5106
commit 4e1382aca4
2 changed files with 44 additions and 28 deletions

View File

@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@ -1060,32 +1062,48 @@ public class ApplicationMaster {
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
String yarnShellId = Integer.toString(yarnShellIdCounter);
yarnShellIdCounter++;
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", yarnShellId=" + yarnShellId
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemorySize()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
if (numAllocatedContainers.get() == numTotalContainers) {
LOG.info("The requested number of containers have been allocated."
+ " Releasing the extra container allocation from the RM.");
amRMClient.releaseAssignedContainer(allocatedContainer.getId());
} else {
numAllocatedContainers.addAndGet(1);
String yarnShellId = Integer.toString(yarnShellIdCounter);
yarnShellIdCounter++;
LOG.info(
"Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", yarnShellId=" + yarnShellId
+ ", containerNode="
+ allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI="
+ allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemorySize()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
Thread launchThread = createLaunchContainerThread(allocatedContainer,
yarnShellId);
Thread launchThread =
createLaunchContainerThread(allocatedContainer, yarnShellId);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start();
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start();
// Remove the corresponding request
Collection<AMRMClient.ContainerRequest> requests =
amRMClient.getMatchingRequests(
allocatedContainer.getAllocationRequestId());
if (requests.iterator().hasNext()) {
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}
}
}
}

View File

@ -106,7 +106,6 @@ public class TestDSAppMaster {
handler.onContainersAllocated(containers);
Assert.assertEquals("Wrong container allocation count", 1,
master.getAllocatedContainers());
Mockito.verifyZeroInteractions(mockClient);
Assert.assertEquals("Incorrect number of threads launched", 1,
master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
@ -121,15 +120,14 @@ public class TestDSAppMaster {
ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
containers.add(generateContainer(id4));
handler.onContainersAllocated(containers);
Assert.assertEquals("Wrong final container allocation count", 4,
Assert.assertEquals("Wrong final container allocation count", 2,
master.getAllocatedContainers());
Assert.assertEquals("Incorrect number of threads launched", 4,
Assert.assertEquals("Incorrect number of threads launched", 2,
master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
Arrays.asList("1", "2", "3", "4"), master.yarnShellIds);
Arrays.asList("1", "2"), master.yarnShellIds);
// make sure we handle completion events correctly
List<ContainerStatus> status = new ArrayList<>();
status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));