MAPREDUCE-7407. Avoid stopContainer() on dead node (#4779)

This commit is contained in:
Ashutosh Gupta 2022-09-15 18:30:36 +01:00 committed by GitHub
parent 6422eaf301
commit 59d3c20118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 14 deletions

View File

@ -379,27 +379,38 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override @Override
public void run() { public void run() {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event {}", event);
// Load ContainerManager tokens before creating a connection. // Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager. // TODO: Do it only once per NodeManager.
ContainerId containerID = event.getContainerID(); ContainerId containerID = event.getContainerID();
Container c = getContainer(event);
switch(event.getType()) { switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
ContainerRemoteLaunchEvent launchEvent ContainerRemoteLaunchEvent launchEvent
= (ContainerRemoteLaunchEvent) event; = (ContainerRemoteLaunchEvent) event;
c.launch(launchEvent); getContainer(event).launch(launchEvent);
break; break;
case CONTAINER_REMOTE_CLEANUP: case CONTAINER_REMOTE_CLEANUP:
c.kill(event.getDumpContainerThreads()); // If the container failed to launch earlier (due to dead node for example),
// it has been marked as FAILED and removed from containers during
// CONTAINER_REMOTE_LAUNCH event handling.
// Skip kill() such container during CONTAINER_REMOTE_CLEANUP as
// it is not necessary and could cost 15 minutes delay if the node is dead.
if (!containers.containsKey(containerID)) {
LOG.info("Skip cleanup of already-removed container {}", containerID);
// send killed event to task attempt regardless like in kill().
context.getEventHandler().handle(new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
getContainer(event).kill(event.getDumpContainerThreads());
break; break;
case CONTAINER_COMPLETED: case CONTAINER_COMPLETED:
c.done(); getContainer(event).done();
break; break;
} }

View File

@ -209,14 +209,11 @@ public class TestContainerLauncherImpl {
ut.waitForPoolToIdle(); ut.waitForPoolToIdle();
verify(mockCM).startContainers(any(StartContainersRequest.class)); verify(mockCM).startContainers(any(StartContainersRequest.class));
LOG.info("inserting cleanup event"); LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent = ContainerLauncherEvent mockCleanupEvent = mock(ContainerLauncherEvent.class);
mock(ContainerLauncherEvent.class); when(mockCleanupEvent.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getType()) when(mockCleanupEvent.getContainerID()).thenReturn(contId);
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent); ut.handle(mockCleanupEvent);
@ -283,8 +280,21 @@ public class TestContainerLauncherImpl {
ut.handle(mockLaunchEvent); ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle(); ut.waitForPoolToIdle();
verify(mockCM, never()).startContainers(any(StartContainersRequest.class)); verify(mockCM).startContainers(any(StartContainersRequest.class));
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent2 = mock(ContainerLauncherEvent.class);
when(mockCleanupEvent2.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent2.getContainerID()).thenReturn(contId);
when(mockCleanupEvent2.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent2.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent2);
ut.waitForPoolToIdle();
// Verifies stopContainers is called on existing container
verify(mockCM).stopContainers(any(StopContainersRequest.class));
} finally { } finally {
ut.stop(); ut.stop();
} }