YARN-4051. ContainerKillEvent lost when container is still recovering and application finishes. Contributed by sandflee
(cherry picked from commit a16ba4296e
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
This commit is contained in:
parent
72fbdf708e
commit
5d86ce7b0d
|
@ -340,15 +340,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
|
LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
|
||||||
+ " with exit code " + rcs.getExitCode());
|
+ " with exit code " + rcs.getExitCode());
|
||||||
|
|
||||||
if (context.getApplications().containsKey(appId)) {
|
Application app = context.getApplications().get(appId);
|
||||||
|
if (app != null) {
|
||||||
Credentials credentials =
|
Credentials credentials =
|
||||||
YarnServerSecurityUtils.parseCredentials(launchContext);
|
YarnServerSecurityUtils.parseCredentials(launchContext);
|
||||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||||
req.getContainerLaunchContext(),
|
req.getContainerLaunchContext(),
|
||||||
credentials, metrics, token, context, rcs);
|
credentials, metrics, token, context, rcs);
|
||||||
context.getContainers().put(containerId, container);
|
context.getContainers().put(containerId, container);
|
||||||
dispatcher.getEventHandler().handle(
|
app.handle(new ApplicationContainerInitEvent(container));
|
||||||
new ApplicationContainerInitEvent(container));
|
|
||||||
} else {
|
} else {
|
||||||
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
|
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
|
||||||
LOG.warn(containerId + " has no corresponding application!");
|
LOG.warn(containerId + " has no corresponding application!");
|
||||||
|
@ -1168,6 +1168,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
+ " is not handled by this NodeManager");
|
+ " is not handled by this NodeManager");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (container.isRecovering()) {
|
||||||
|
throw new NMNotYetReadyException("Container " + containerIDStr
|
||||||
|
+ " is recovering, try later");
|
||||||
|
}
|
||||||
context.getNMStateStore().storeContainerKilled(containerID);
|
context.getNMStateStore().storeContainerKilled(containerID);
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new ContainerKillEvent(containerID,
|
new ContainerKillEvent(containerID,
|
||||||
|
@ -1313,6 +1317,21 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
+ " FINISH_APPS event");
|
+ " FINISH_APPS event");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean shouldDropEvent = false;
|
||||||
|
for (Container container : app.getContainers().values()) {
|
||||||
|
if (container.isRecovering()) {
|
||||||
|
LOG.info("drop FINISH_APPS event to " + appID + " because "
|
||||||
|
+ "container " + container.getContainerId()
|
||||||
|
+ " is recovering");
|
||||||
|
shouldDropEvent = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (shouldDropEvent) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
String diagnostic = "";
|
String diagnostic = "";
|
||||||
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
|
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
|
||||||
diagnostic = "Application killed on shutdown";
|
diagnostic = "Application killed on shutdown";
|
||||||
|
@ -1327,10 +1346,32 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
case FINISH_CONTAINERS:
|
case FINISH_CONTAINERS:
|
||||||
CMgrCompletedContainersEvent containersFinishedEvent =
|
CMgrCompletedContainersEvent containersFinishedEvent =
|
||||||
(CMgrCompletedContainersEvent) event;
|
(CMgrCompletedContainersEvent) event;
|
||||||
for (ContainerId container : containersFinishedEvent
|
for (ContainerId containerId : containersFinishedEvent
|
||||||
.getContainersToCleanup()) {
|
.getContainersToCleanup()) {
|
||||||
|
ApplicationId appId =
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId();
|
||||||
|
Application app = this.context.getApplications().get(appId);
|
||||||
|
if (app == null) {
|
||||||
|
LOG.warn("couldn't find app " + appId + " while processing"
|
||||||
|
+ " FINISH_CONTAINERS event");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Container container = app.getContainers().get(containerId);
|
||||||
|
if (container == null) {
|
||||||
|
LOG.warn("couldn't find container " + containerId
|
||||||
|
+ " while processing FINISH_CONTAINERS event");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (container.isRecovering()) {
|
||||||
|
LOG.info("drop FINISH_CONTAINERS event to " + containerId
|
||||||
|
+ " because container is recovering");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
this.dispatcher.getEventHandler().handle(
|
this.dispatcher.getEventHandler().handle(
|
||||||
new ContainerKillEvent(container,
|
new ContainerKillEvent(containerId,
|
||||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||||
"Container Killed by ResourceManager"));
|
"Container Killed by ResourceManager"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
@ -77,7 +77,7 @@ public class ApplicationImpl implements Application {
|
||||||
private LogAggregationContext logAggregationContext;
|
private LogAggregationContext logAggregationContext;
|
||||||
|
|
||||||
Map<ContainerId, Container> containers =
|
Map<ContainerId, Container> containers =
|
||||||
new HashMap<ContainerId, Container>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
|
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
|
||||||
Credentials credentials, Context context) {
|
Credentials credentials, Context context) {
|
||||||
|
|
|
@ -57,4 +57,5 @@ public interface Container extends EventHandler<ContainerEvent> {
|
||||||
|
|
||||||
String toString();
|
String toString();
|
||||||
|
|
||||||
|
boolean isRecovering();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1188,4 +1188,12 @@ public class ContainerImpl implements Container {
|
||||||
LocalResourceRequest resource) {
|
LocalResourceRequest resource) {
|
||||||
return container.resourcesUploadPolicies.get(resource);
|
return container.resourcesUploadPolicies.get(resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRecovering() {
|
||||||
|
boolean isRecovering = (
|
||||||
|
recoveredStatus != RecoveredContainerStatus.REQUESTED &&
|
||||||
|
getContainerState() == ContainerState.NEW);
|
||||||
|
return isRecovering;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
@ -246,8 +247,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
// simulate application completion
|
// simulate application completion
|
||||||
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
||||||
finishedApps.add(appId);
|
finishedApps.add(appId);
|
||||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
app.handle(new ApplicationFinishEvent(
|
||||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
appId, "Application killed by ResourceManager"));
|
||||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
|
|
||||||
// restart and verify app is marked for finishing
|
// restart and verify app is marked for finishing
|
||||||
|
@ -261,8 +262,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
assertNotNull(app);
|
assertNotNull(app);
|
||||||
// no longer saving FINISH_APP event in NM stateStore,
|
// no longer saving FINISH_APP event in NM stateStore,
|
||||||
// simulate by resending FINISH_APP event
|
// simulate by resending FINISH_APP event
|
||||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
app.handle(new ApplicationFinishEvent(
|
||||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
appId, "Application killed by ResourceManager"));
|
||||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
assertTrue(context.getApplicationACLsManager().checkAccess(
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
UserGroupInformation.createRemoteUser(modUser),
|
UserGroupInformation.createRemoteUser(modUser),
|
||||||
|
@ -333,8 +334,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
// simulate application completion
|
// simulate application completion
|
||||||
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
||||||
finishedApps.add(appId);
|
finishedApps.add(appId);
|
||||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
app.handle(new ApplicationFinishEvent(
|
||||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
appId, "Application killed by ResourceManager"));
|
||||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
|
|
||||||
app.handle(new ApplicationEvent(app.getAppId(),
|
app.handle(new ApplicationEvent(app.getAppId(),
|
||||||
|
@ -355,8 +356,9 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||||
|
|
||||||
// no longer saving FINISH_APP event in NM stateStore,
|
// no longer saving FINISH_APP event in NM stateStore,
|
||||||
// simulate by resending FINISH_APP event
|
// simulate by resending FINISH_APP event
|
||||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
app.handle(new ApplicationFinishEvent(
|
||||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
appId, "Application killed by ResourceManager"));
|
||||||
|
|
||||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
// TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
|
// TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
|
||||||
// is needed.
|
// is needed.
|
||||||
|
|
|
@ -144,4 +144,9 @@ public class MockContainer implements Container {
|
||||||
public NMContainerStatus getNMContainerStatus() {
|
public NMContainerStatus getNMContainerStatus() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRecovering() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue