diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 38eb636f746..7d5525a8893 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -367,6 +368,13 @@ private void recover() throws IOException, URISyntaxException { } recoverContainer(rcs); } + + //Dispatching the RECOVERY_COMPLETED event through the dispatcher + //so that all the paused, scheduled and queued containers will + //be scheduled for execution on availability of resources. + dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.RECOVERY_COMPLETED)); } else { LOG.info("Not a recoverable state store. Nothing to recover."); } @@ -480,6 +488,7 @@ protected void recoverActiveContainer(Application app, Container container = new ContainerImpl(getConfig(), dispatcher, launchContext, credentials, metrics, token, context, rcs); context.getContainers().put(token.getContainerID(), container); + containerScheduler.recoverActiveContainer(container, rcs.getStatus()); app.handle(new ApplicationContainerInitEvent(container)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index e28b37d7bf9..75e32e482cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -46,5 +46,7 @@ public enum ContainerEventType { CONTAINER_RESUMED, // Producer: ContainerScheduler - CONTAINER_TOKEN_UPDATED + CONTAINER_TOKEN_UPDATED, + + RECOVER_PAUSED_CONTAINER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 580dba7966a..8e2d66e6515 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -359,6 +359,9 @@ ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition()) // From SCHEDULED State .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) + .addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED, + ContainerEventType.RECOVER_PAUSED_CONTAINER, + new RecoveredContainerTransition()) .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition(true)) @@ -949,7 +952,10 @@ public void sendLaunchEvent() { if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { // try to recover a container that was previously launched launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) { + launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER; } + containerLaunchStartTime = clock.getTime(); dispatcher.getEventHandler().handle( new ContainersLauncherEvent(this, launcherEvent)); @@ -960,9 +966,6 @@ public void sendLaunchEvent() { @SuppressWarnings("unchecked") // dispatcher not typed private void sendScheduleEvent() { if (recoveredStatus == RecoveredContainerStatus.PAUSED) { - // Recovery is not supported for paused container so we raise the - // launch event which will proceed to kill the paused container instead - // of raising the schedule event. ContainersLauncherEventType launcherEvent; launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER; dispatcher.getEventHandler() @@ -1057,17 +1060,15 @@ public void transition( UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event; // Update the container token container.setContainerTokenIdentifier(updateEvent.getUpdatedToken()); - if (updateEvent.isResourceChange()) { - try { - // Persist change in the state store. - container.context.getNMStateStore().storeContainerResourceChanged( - container.containerId, - container.getContainerTokenIdentifier().getVersion(), - container.getResource()); - } catch (IOException e) { - LOG.warn("Could not store container [" + container.containerId - + "] resource change..", e); - } + + try { + // Persist change in the state store. + container.context.getNMStateStore() + .storeContainerUpdateToken(container.containerId, + container.getContainerTokenIdentifier()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.containerId + + "] update..", e); } } } @@ -1112,6 +1113,8 @@ public ContainerState transition(ContainerImpl container, if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { container.sendFinishedEvents(); return ContainerState.DONE; + } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) { + return ContainerState.SCHEDULED; } else if (container.recoveredAsKilled && container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { // container was killed but never launched @@ -1470,6 +1473,18 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition from SCHEDULED state to PAUSED state on recovery + */ + static class RecoveredContainerTransition extends ContainerTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + container.sendContainerMonitorStartEvent(); + container.wasLaunched = true; + } + } + /** * Transition from RUNNING or KILLING state to * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 9f6ef743d9b..cfd5d6a95f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -140,8 +140,6 @@ public void handle(ContainersLauncherEvent event) { running.put(containerId, launch); break; case RECOVER_PAUSED_CONTAINER: - // Recovery for paused containers is not supported, thus here - // we locate any paused containers, and terminate them. app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); launch = new RecoverPausedContainerLaunch(context, getConfig(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java index 14cab9a2978..761fe3b11e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*; -import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; import java.io.File; import java.io.IOException; @@ -66,6 +66,8 @@ public Integer call() { containerId.getApplicationAttemptId().getApplicationId().toString(); String containerIdStr = containerId.toString(); + dispatcher.getEventHandler().handle(new ContainerEvent(containerId, + ContainerEventType.RECOVER_PAUSED_CONTAINER)); boolean notInterrupted = true; try { File pidFile = locatePidFile(appIdStr, containerIdStr); @@ -73,16 +75,17 @@ public Integer call() { String pidPathStr = pidFile.getPath(); pidFilePath = new Path(pidPathStr); exec.activateContainer(containerId, pidFilePath); - exec.signalContainer(new ContainerSignalContext.Builder() - .setContainer(container) - .setUser(container.getUser()) - .setSignal(ContainerExecutor.Signal.KILL) - .build()); + retCode = exec.reacquireContainer( + new ContainerReacquisitionContext.Builder() + .setContainer(container) + .setUser(container.getUser()) + .setContainerId(containerId) + .build()); } else { LOG.warn("Unable to locate pid file for container " + containerIdStr); } - } catch (InterruptedIOException e) { + } catch (InterruptedException | InterruptedIOException e) { LOG.warn("Interrupted while waiting for exit code from " + containerId); notInterrupted = false; } catch (IOException e) { @@ -100,14 +103,21 @@ public Integer call() { } } - LOG.warn("Recovered container exited with a non-zero exit code " - + retCode); - this.dispatcher.getEventHandler().handle(new ContainerExitEvent( - containerId, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, - "Container exited with a non-zero exit code " + retCode)); + if (retCode != 0) { + LOG.warn("Recovered container exited with a non-zero exit code " + + retCode); + this.dispatcher.getEventHandler().handle(new ContainerExitEvent( + containerId, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, + "Container exited with a non-zero exit code " + retCode)); + return retCode; + } - return retCode; + LOG.info("Recovered container " + containerId + " succeeded"); + dispatcher.getEventHandler().handle( + new ContainerEvent(containerId, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); + return 0; } private File locatePidFile(String appIdStr, String containerIdStr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java index 17ddd77857f..a3ccf00de2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java @@ -72,7 +72,7 @@ public Integer call() { String containerIdStr = containerId.toString(); dispatcher.getEventHandler().handle(new ContainerEvent(containerId, - ContainerEventType.CONTAINER_LAUNCHED)); + ContainerEventType.RECOVER_PAUSED_CONTAINER)); boolean notInterrupted = true; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index e43682214e6..76da37c5c34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +169,8 @@ public void handle(ContainerSchedulerEvent event) { case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; + case RECOVERY_COMPLETED: + startPendingContainers(maxOppQueueLength <= 0); default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); @@ -218,6 +221,34 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { } } + /** + * Populates auxiliary data structures used by the ContainerScheduler on + * recovery. + * @param container container recovered + * @param rcs Recovered Container status + */ + public void recoverActiveContainer(Container container, + RecoveredContainerStatus rcs) { + ExecutionType execType = + container.getContainerTokenIdentifier().getExecutionType(); + if (rcs == RecoveredContainerStatus.QUEUED + || rcs == RecoveredContainerStatus.PAUSED) { + if (execType == ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + } else if (execType == ExecutionType.OPPORTUNISTIC) { + queuedOpportunisticContainers + .put(container.getContainerId(), container); + } else { + LOG.error( + "UnKnown execution type received " + container.getContainerId() + + ", execType " + execType); + } + } else if (rcs == RecoveredContainerStatus.LAUNCHED) { + runningContainers.put(container.getContainerId(), container); + utilizationTracker.addContainerResources(container); + } + } + /** * Return number of queued containers. * @return Number of queued containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index a9cbf745a81..294eddf88e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -27,5 +27,6 @@ public enum ContainerSchedulerEventType { UPDATE_CONTAINER, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, - CONTAINER_PAUSED + CONTAINER_PAUSED, + RECOVERY_COMPLETED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 631af880831..8865d389fea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -20,6 +20,9 @@ import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.slf4j.LoggerFactory; import java.io.File; @@ -45,11 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.records.Version; @@ -117,8 +118,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused"; - private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = - "/resourceChanged"; + private static final String CONTAINER_UPDATE_TOKEN_SUFFIX = + "/updateToken"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = @@ -284,9 +285,17 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = Integer.parseInt(asString(entry.getValue())); - } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { - rcs.capability = new ResourcePBImpl( - ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) { + ContainerTokenIdentifierProto tokenIdentifierProto = + ContainerTokenIdentifierProto.parseFrom(entry.getValue()); + Token currentToken = rcs.getStartRequest().getContainerToken(); + Token updatedToken = Token + .newInstance(tokenIdentifierProto.toByteArray(), + ContainerTokenIdentifier.KIND.toString(), + currentToken.getPassword().array(), currentToken.getService()); + rcs.startRequest.setContainerToken(updatedToken); + rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource()); + rcs.version = tokenIdentifierProto.getVersion(); } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); @@ -361,6 +370,21 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { } } + private void removeContainerQueued(ContainerId containerId) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("removeContainerQueued: containerId=" + containerId); + } + + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_QUEUED_KEY_SUFFIX; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void storeContainerPaused(ContainerId containerId) throws IOException { if (LOG.isDebugEnabled()) { @@ -416,6 +440,8 @@ public void storeContainerLaunched(ContainerId containerId) LOG.debug("storeContainerLaunched: containerId=" + containerId); } + // Removing the container if queued for backward compatibility reasons + removeContainerQueued(containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_LAUNCHED_KEY_SUFFIX; try { @@ -426,24 +452,25 @@ public void storeContainerLaunched(ContainerId containerId) } @Override - public void storeContainerResourceChanged(ContainerId containerId, - int containerVersion, Resource capability) throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("storeContainerResourceChanged: containerId=" + containerId - + ", capability=" + capability); + LOG.debug("storeContainerUpdateToken: containerId=" + containerId); } - String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() - + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_UPDATE_TOKEN_SUFFIX; String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_VERSION_KEY_SUFFIX; + try { WriteBatch batch = db.createWriteBatch(); try { // New value will overwrite old values for the same key - batch.put(bytes(keyResChng), - ProtoUtils.convertToProtoFormat(capability).toByteArray()); - batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); + batch.put(bytes(keyUpdateToken), + containerTokenIdentifier.getProto().toByteArray()); + batch.put(bytes(keyVersion), + bytes(Integer.toString(containerTokenIdentifier.getVersion()))); db.write(batch); } finally { batch.close(); @@ -539,6 +566,7 @@ public void removeContainer(ContainerId containerId) batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_UPDATE_TOKEN_SUFFIX)); List unknownKeysForContainer = containerUnknownKeySuffixes .removeAll(containerId); for (String unknownKeySuffix : unknownKeysForContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 155ee8fad2e..81b8c79e94a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; // The state store to use when state isn't being stored @@ -98,8 +98,8 @@ public void storeContainerLaunched(ContainerId containerId) } @Override - public void storeContainerResourceChanged(ContainerId containerId, - int version, Resource capability) throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 7e2debf5778..bd73b486e69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; @Private @@ -418,14 +419,13 @@ public abstract void storeContainerLaunched(ContainerId containerId) throws IOException; /** - * Record that a container resource has been changed + * Record that a container has been updated * @param containerId the container ID - * @param containerVersion the container version - * @param capability the container resource capability + * @param containerTokenIdentifier container token identifier * @throws IOException */ - public abstract void storeContainerResourceChanged(ContainerId containerId, - int containerVersion, Resource capability) throws IOException; + public abstract void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException; /** * Record that a container has completed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 224e99cf9f8..d6cb0bc2f1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -402,7 +402,7 @@ public void testContainerResizeRecovery() throws Exception { stateStore.start(); Context context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context, delSrvc); - cm.dispatcher.disableExitOnDispatchException(); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); // add an application by starting a container @@ -471,6 +471,7 @@ public void testContainerResizeRecovery() throws Exception { cm.stop(); context = createContext(conf, stateStore); cm = createContainerManager(context); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); assertEquals(1, context.getApplications().size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 96893a07032..9f8c0cb24fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -32,12 +32,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -172,12 +173,17 @@ public synchronized void storeContainerLaunched(ContainerId containerId) } @Override - public synchronized void storeContainerResourceChanged( - ContainerId containerId, int version, Resource capability) - throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); - rcs.capability = capability; - rcs.version = version; + rcs.capability = containerTokenIdentifier.getResource(); + rcs.version = containerTokenIdentifier.getVersion(); + Token currentToken = rcs.getStartRequest().getContainerToken(); + Token updatedToken = Token + .newInstance(containerTokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), + currentToken.getPassword().array(), currentToken.getService()); + rcs.startRequest.setContainerToken(updatedToken); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 76fe130f799..3ab02ebe246 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -306,13 +306,18 @@ public void testContainerStorage() throws IOException { assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered - stateStore.storeContainerResourceChanged(containerId, 2, - Resource.newInstance(2468, 4)); + ContainerTokenIdentifier updateTokenIdentifier = + new ContainerTokenIdentifier(containerId, "host", "user", + Resource.newInstance(2468, 4), 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + + stateStore + .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); - assertEquals(2, rcs.getVersion()); + assertEquals(0, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -329,7 +334,9 @@ public void testContainerStorage() throws IOException { assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertTrue(rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils + .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken()); + assertEquals(updateTokenIdentifier, tokenReadFromRequest); assertEquals(diags.toString(), rcs.getDiagnostics()); // add yet more diags, mark container completed, and verify recovered @@ -343,7 +350,6 @@ public void testContainerStorage() throws IOException { assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); assertEquals(21, rcs.getExitCode()); assertTrue(rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); // store remainingRetryAttempts, workDir and logDir