diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e497f62264b..d12892e325f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -144,7 +145,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -1251,29 +1251,6 @@ private void updateContainerInternal(ContainerId containerId, + " [" + containerTokenIdentifier.getVersion() + "]"); } - // Check container state - org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState currentState = - container.getContainerState(); - EnumSet allowedStates = EnumSet.of( - org.apache.hadoop.yarn.server.nodemanager.containermanager.container - .ContainerState.RUNNING, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container - .ContainerState.SCHEDULED, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container - .ContainerState.LOCALIZING, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container - .ContainerState.REINITIALIZING, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container - .ContainerState.RELAUNCHING); - if (!allowedStates.contains(currentState)) { - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is in " + currentState.name() + " state." - + " Resource can only be changed when a container is in" - + " RUNNING or SCHEDULED state"); - } - // Check validity of the target resource. Resource currentResource = container.getResource(); ExecutionType currentExecType = @@ -1313,11 +1290,11 @@ private void updateContainerInternal(ContainerId containerId, this.readLock.lock(); try { if (!serviceStopped) { - // Dispatch message to ContainerScheduler to actually + // Dispatch message to Container to actually // make the change. - dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent( - container, containerTokenIdentifier, isResourceChange, - isExecTypeUpdate, isIncrease)); + dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent( + container.getContainerId(), containerTokenIdentifier, + isResourceChange, isExecTypeUpdate, isIncrease)); } else { throw new YarnException( "Unable to change container resource as the NodeManager is " @@ -1816,10 +1793,14 @@ private Container preReInitializeOrLocalizeCheck(ContainerId containerId, if (container == null) { throw new YarnException("Specified " + containerId + " does not exist!"); } - if (!container.isRunning() || container.isReInitializing()) { + if (!container.isRunning() || container.isReInitializing() + || container.getContainerTokenIdentifier().getExecutionType() + == ExecutionType.OPPORTUNISTIC) { throw new YarnException("Cannot perform " + op + " on [" + containerId + "]. Current state is [" + container.getContainerState() + ", " + - "isReInitializing=" + container.isReInitializing() + "]."); + "isReInitializing=" + container.isReInitializing() + "]. Container" + + " Execution Type is [" + container.getContainerTokenIdentifier() + .getExecutionType() + "]."); } return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 147543567d0..e28b37d7bf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -29,6 +29,7 @@ public enum ContainerEventType { ROLLBACK_REINIT, PAUSE_CONTAINER, RESUME_CONTAINER, + UPDATE_CONTAINER_TOKEN, // DownloadManager CONTAINER_INITED, @@ -42,5 +43,8 @@ public enum ContainerEventType { CONTAINER_EXITED_WITH_FAILURE, CONTAINER_KILLED_ON_REQUEST, CONTAINER_PAUSED, - CONTAINER_RESUMED + CONTAINER_RESUMED, + + // Producer: ContainerScheduler + CONTAINER_TOKEN_UPDATED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 836e70e68f0..705087b63ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -33,6 +33,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -308,8 +310,8 @@ ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) - .addTransition(ContainerState.NEW, ContainerState.DONE, - ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) + .addTransition(ContainerState.NEW, ContainerState.NEW, + ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition()) // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, @@ -325,8 +327,9 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition()) .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillBeforeRunningTransition()) - .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, - ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) + .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition()) + // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -351,6 +354,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, ContainerEventType.RESOURCE_FAILED) + .addTransition(ContainerState.LOCALIZATION_FAILED, + ContainerState.LOCALIZATION_FAILED, + ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition()) // From SCHEDULED State .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING, @@ -364,6 +370,9 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillBeforeRunningTransition()) + .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -376,10 +385,16 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) - .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING, + .addTransition(ContainerState.RUNNING, + EnumSet.of(ContainerState.RUNNING, + ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING_AWAITING_KILL), ContainerEventType.REINITIALIZE_CONTAINER, new ReInitializeContainerTransition()) - .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING, + .addTransition(ContainerState.RUNNING, + EnumSet.of(ContainerState.RUNNING, + ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING_AWAITING_KILL), ContainerEventType.ROLLBACK_REINIT, new RollbackContainerTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, @@ -398,9 +413,16 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) .addTransition(ContainerState.RUNNING, ContainerState.PAUSING, - ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) + ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) + // From PAUSING State + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileRunningTransition()) .addTransition(ContainerState.PAUSING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, @@ -420,6 +442,12 @@ ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition()) .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileRunningTransition()) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) // From PAUSED State .addTransition(ContainerState.PAUSED, ContainerState.KILLING, @@ -429,6 +457,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, ContainerEventType.PAUSE_CONTAINER) + // This can happen during re-initialization. + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileRunningTransition()) .addTransition(ContainerState.PAUSED, ContainerState.RESUMING, ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition()) // In case something goes wrong then container will exit from the @@ -444,6 +476,9 @@ ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition()) ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) // From RESUMING State .addTransition(ContainerState.RESUMING, ContainerState.KILLING, @@ -453,6 +488,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) + // This can happen during re-initialization + .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileRunningTransition()) // In case something goes wrong then container will exit from the // RESUMING state .addTransition(ContainerState.RESUMING, @@ -467,6 +506,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) + .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) + // NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state. // From REINITIALIZING State .addTransition(ContainerState.REINITIALIZING, @@ -478,7 +521,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition(true)) .addTransition(ContainerState.REINITIALIZING, - ContainerState.REINITIALIZING, + EnumSet.of(ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING_AWAITING_KILL), ContainerEventType.RESOURCE_LOCALIZED, new ResourceLocalizedWhileReInitTransition()) .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING, @@ -490,12 +534,39 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) - .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, - ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) + .addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING, + ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) .addTransition(ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) + + // from REINITIALIZING_AWAITING_KILL + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, ContainerState.SCHEDULED, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledForReInitializationTransition()) + .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerState.REINITIALIZING_AWAITING_KILL, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) // From RELAUNCHING State .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING, @@ -511,6 +582,10 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition()) .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) + .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) + // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, @@ -524,6 +599,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) ContainerState.EXITED_WITH_SUCCESS, EnumSet.of(ContainerEventType.KILL_CONTAINER, ContainerEventType.PAUSE_CONTAINER)) + // No transition - assuming container is on its way to completion + .addTransition(ContainerState.EXITED_WITH_SUCCESS, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.UPDATE_CONTAINER_TOKEN) // From EXITED_WITH_FAILURE State .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, @@ -537,6 +616,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) ContainerState.EXITED_WITH_FAILURE, EnumSet.of(ContainerEventType.KILL_CONTAINER, ContainerEventType.PAUSE_CONTAINER)) + // No transition - assuming container is on its way to completion + .addTransition(ContainerState.EXITED_WITH_FAILURE, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.UPDATE_CONTAINER_TOKEN) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -572,6 +655,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) ContainerState.KILLING, EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED, ContainerEventType.PAUSE_CONTAINER)) + // No transition - assuming container is on its way to completion + .addTransition(ContainerState.KILLING, ContainerState.KILLING, + ContainerEventType.UPDATE_CONTAINER_TOKEN) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, @@ -589,6 +675,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.PAUSE_CONTAINER)) + // No transition - assuming container is on its way to completion + .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerEventType.UPDATE_CONTAINER_TOKEN) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -606,6 +696,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) EnumSet.of(ContainerEventType.RESOURCE_FAILED, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) + // No transition - assuming container is on its way to completion + .addTransition(ContainerState.DONE, ContainerState.DONE, + ContainerEventType.UPDATE_CONTAINER_TOKEN) // create the topology tables .installTopology(); @@ -626,6 +719,7 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { case RUNNING: case RELAUNCHING: case REINITIALIZING: + case REINITIALIZING_AWAITING_KILL: case EXITED_WITH_SUCCESS: case EXITED_WITH_FAILURE: case KILLING: @@ -929,6 +1023,45 @@ public void transition(ContainerImpl container, ContainerEvent event) { } + static class UpdateTransition extends ContainerTransition { + @Override + public void transition( + ContainerImpl container, ContainerEvent event) { + UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event; + // Update the container token + container.setContainerTokenIdentifier(updateEvent.getUpdatedToken()); + if (updateEvent.isResourceChange()) { + try { + // Persist change in the state store. + container.context.getNMStateStore().storeContainerResourceChanged( + container.containerId, + container.getContainerTokenIdentifier().getVersion(), + container.getResource()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.containerId + + "] resource change..", e); + } + } + } + } + + static class NotifyContainerSchedulerOfUpdateTransition extends + UpdateTransition { + @Override + public void transition( + ContainerImpl container, ContainerEvent event) { + + UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event; + // Save original token + ContainerTokenIdentifier originalToken = + container.containerTokenIdentifier; + super.transition(container, updateEvent); + container.dispatcher.getEventHandler().handle( + new UpdateContainerSchedulerEvent(container, + originalToken, updateEvent)); + } + } + /** * State transition when a NEW container receives the INIT_CONTAINER * message. @@ -1074,12 +1207,15 @@ public ContainerState transition(ContainerImpl container, /** * Transition to start the Re-Initialization process. */ - static class ReInitializeContainerTransition extends ContainerTransition { + static class ReInitializeContainerTransition implements + MultipleArcTransition { @SuppressWarnings("unchecked") @Override - public void transition(ContainerImpl container, ContainerEvent event) { + public ContainerState transition( + ContainerImpl container, ContainerEvent event) { container.reInitContext = createReInitContext(container, event); + boolean resourcesPresent = false; try { // 'reInitContext.newResourceSet' can be // a) current container resourceSet (In case of Restart) @@ -1101,6 +1237,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT)); + resourcesPresent = true; } container.metrics.reInitingContainer(); NMAuditLogger.logSuccess(container.user, @@ -1112,7 +1249,11 @@ public void transition(ContainerImpl container, ContainerEvent event) { " re-initialization failure..", e); container.addDiagnostics("Error re-initializing due to" + "[" + e.getMessage() + "]"); + return ContainerState.RUNNING; } + return resourcesPresent ? + ContainerState.REINITIALIZING_AWAITING_KILL : + ContainerState.REINITIALIZING; } protected ReInitializationContext createReInitContext( @@ -1164,11 +1305,14 @@ protected ReInitializationContext createReInitContext(ContainerImpl * If all dependencies are met, then restart Container with new bits. */ static class ResourceLocalizedWhileReInitTransition - extends ContainerTransition { + implements MultipleArcTransition + { + @SuppressWarnings("unchecked") @Override - public void transition(ContainerImpl container, ContainerEvent event) { + public ContainerState transition( + ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; container.reInitContext.newResourceSet.resourceLocalized( @@ -1180,7 +1324,9 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT)); + return ContainerState.REINITIALIZING_AWAITING_KILL; } + return ContainerState.REINITIALIZING; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 7c3fea805bb..5644d034500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -20,7 +20,8 @@ public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, - REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, + REINITIALIZING, REINITIALIZING_AWAITING_KILL, + EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE, PAUSING, PAUSED, RESUMING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java new file mode 100644 index 00000000000..c9dc97e78dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; + +/** + * Update Event consumed by the Container. + */ +public class UpdateContainerTokenEvent extends ContainerEvent { + private final ContainerTokenIdentifier updatedToken; + private final boolean isResourceChange; + private final boolean isExecTypeUpdate; + private final boolean isIncrease; + + /** + * Create Update event. + * + * @param cID Container Id. + * @param updatedToken Updated Container Token. + * @param isResourceChange Is Resource change. + * @param isExecTypeUpdate Is ExecutionType Update. + * @param isIncrease Is container increase. + */ + public UpdateContainerTokenEvent(ContainerId cID, + ContainerTokenIdentifier updatedToken, boolean isResourceChange, + boolean isExecTypeUpdate, boolean isIncrease) { + super(cID, ContainerEventType.UPDATE_CONTAINER_TOKEN); + this.updatedToken = updatedToken; + this.isResourceChange = isResourceChange; + this.isExecTypeUpdate = isExecTypeUpdate; + this.isIncrease = isIncrease; + } + + /** + * Update Container Token. + * + * @return Container Token. + */ + public ContainerTokenIdentifier getUpdatedToken() { + return updatedToken; + } + + /** + * Is this update a ResourceChange. + * + * @return isResourceChange. + */ + public boolean isResourceChange() { + return isResourceChange; + } + + /** + * Is this update an ExecType Update. + * + * @return isExecTypeUpdate. + */ + public boolean isExecTypeUpdate() { + return isExecTypeUpdate; + } + + /** + * Is this a container Increase. + * + * @return isIncrease. + */ + public boolean isIncrease() { + return isIncrease; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 830a06d5297..e43682214e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor .ChangeMonitoringContainerResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -151,7 +152,9 @@ public void handle(ContainerSchedulerEvent event) { case SCHEDULE_CONTAINER: scheduleContainer(event.getContainer()); break; + // NOTE: Is sent only after container state has changed to PAUSED... case CONTAINER_PAUSED: + // NOTE: Is sent only after container state has changed to DONE... case CONTAINER_COMPLETED: onResourcesReclaimed(event.getContainer()); break; @@ -180,58 +183,38 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { if (updateEvent.isResourceChange()) { if (runningContainers.containsKey(containerId)) { this.utilizationTracker.subtractContainerResource( - updateEvent.getContainer()); - updateEvent.getContainer().setContainerTokenIdentifier( - updateEvent.getUpdatedToken()); + new ContainerImpl(getConfig(), null, null, null, null, + updateEvent.getOriginalToken(), context)); this.utilizationTracker.addContainerResources( updateEvent.getContainer()); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent(containerId, updateEvent.getUpdatedToken().getResource())); - } else { - // Is Queued or localizing.. - updateEvent.getContainer().setContainerTokenIdentifier( - updateEvent.getUpdatedToken()); - } - try { - // Persist change in the state store. - this.context.getNMStateStore().storeContainerResourceChanged( - containerId, - updateEvent.getUpdatedToken().getVersion(), - updateEvent.getUpdatedToken().getResource()); - } catch (IOException e) { - LOG.warn("Could not store container [" + containerId + "] resource " + - "change..", e); } } if (updateEvent.isExecTypeUpdate()) { - updateEvent.getContainer().setContainerTokenIdentifier( - updateEvent.getUpdatedToken()); - // If this is a running container.. just change the execution type - // and be done with it. - if (!runningContainers.containsKey(containerId)) { - // Promotion or not (Increase signifies either a promotion - // or container size increase) - if (updateEvent.isIncrease()) { - // Promotion of queued container.. - if (queuedOpportunisticContainers.remove(containerId) != null) { - queuedGuaranteedContainers.put(containerId, - updateEvent.getContainer()); - } + // Promotion or not (Increase signifies either a promotion + // or container size increase) + if (updateEvent.isIncrease()) { + // Promotion of queued container.. + if (queuedOpportunisticContainers.remove(containerId) != null) { + queuedGuaranteedContainers.put(containerId, + updateEvent.getContainer()); //Kill/pause opportunistic containers if any to make room for // promotion request reclaimOpportunisticContainerResources(updateEvent.getContainer()); - } else { - // Demotion of queued container.. Should not happen too often - // since you should not find too many queued guaranteed - // containers - if (queuedGuaranteedContainers.remove(containerId) != null) { - queuedOpportunisticContainers.put(containerId, - updateEvent.getContainer()); - } + } + } else { + // Demotion of queued container.. Should not happen too often + // since you should not find too many queued guaranteed + // containers + if (queuedGuaranteedContainers.remove(containerId) != null) { + queuedOpportunisticContainers.put(containerId, + updateEvent.getContainer()); } } + startPendingContainers(maxOppQueueLength <= 0); } } @@ -290,6 +273,16 @@ private void onResourcesReclaimed(Container container) { queuedGuaranteedContainers.remove(container.getContainerId()); } + // Requeue PAUSED containers + if (container.getContainerState() == ContainerState.PAUSED) { + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + } else { + queuedOpportunisticContainers.put( + container.getContainerId(), container); + } + } // decrement only if it was a running container Container completedContainer = runningContainers.remove(container .getContainerId()); @@ -301,7 +294,8 @@ private void onResourcesReclaimed(Container container) { ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - startPendingContainers(false); + boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); } } @@ -311,26 +305,9 @@ private void onResourcesReclaimed(Container container) { * container without looking at available resource */ private void startPendingContainers(boolean forceStartGuaranteedContaieners) { - // Start pending guaranteed containers, if resources available. + // Start guaranteed containers that are paused, if resources available. boolean resourcesAvailable = startContainers( - queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); - // Resume opportunistic containers, if resource available. - if (resourcesAvailable) { - List pausedContainers = new ArrayList(); - Map containers = - context.getContainers(); - for (Map.Entryentry : containers.entrySet()) { - ContainerId contId = entry.getKey(); - // Find containers that were not already started and are in paused state - if(false == runningContainers.containsKey(contId)) { - if(containers.get(contId).getContainerState() - == ContainerState.PAUSED) { - pausedContainers.add(containers.get(contId)); - } - } - } - resourcesAvailable = startContainers(pausedContainers, false); - } + queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); // Start opportunistic containers, if resources available. if (resourcesAvailable) { startContainers(queuedOpportunisticContainers.values(), false); @@ -590,16 +567,19 @@ private void shedQueuedOpportunisticContainers() { queuedOpportunisticContainers.values().iterator(); while (containerIter.hasNext()) { Container container = containerIter.next(); - if (numAllowed <= 0) { - container.sendKillEvent( - ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, - "Container De-queued to meet NM queuing limits."); - containerIter.remove(); - LOG.info( - "Opportunistic container {} will be killed to meet NM queuing" + - " limits.", container.getContainerId()); + // Do not shed PAUSED containers + if (container.getContainerState() != ContainerState.PAUSED) { + if (numAllowed <= 0) { + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container De-queued to meet NM queuing limits."); + containerIter.remove(); + LOG.info( + "Opportunistic container {} will be killed to meet NM queuing" + + " limits.", container.getContainerId()); + } + numAllowed--; } - numAllowed--; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java index 5384b7e8dbc..247398296bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java @@ -21,33 +21,37 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container .Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; + /** * Update Event consumed by the {@link ContainerScheduler}. */ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent { - private ContainerTokenIdentifier updatedToken; - private boolean isResourceChange; - private boolean isExecTypeUpdate; - private boolean isIncrease; + private final UpdateContainerTokenEvent containerEvent; + private final ContainerTokenIdentifier originalToken; /** * Create instance of Event. * - * @param originalContainer Original Container. - * @param updatedToken Updated Container Token. - * @param isResourceChange is this a Resource Change. - * @param isExecTypeUpdate is this an ExecTypeUpdate. - * @param isIncrease is this a Container Increase. + * @param container Container. + * @param origToken The Original Container Token. + * @param event The Container Event. */ - public UpdateContainerSchedulerEvent(Container originalContainer, - ContainerTokenIdentifier updatedToken, boolean isResourceChange, - boolean isExecTypeUpdate, boolean isIncrease) { - super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER); - this.updatedToken = updatedToken; - this.isResourceChange = isResourceChange; - this.isExecTypeUpdate = isExecTypeUpdate; - this.isIncrease = isIncrease; + public UpdateContainerSchedulerEvent(Container container, + ContainerTokenIdentifier origToken, UpdateContainerTokenEvent event) { + super(container, ContainerSchedulerEventType.UPDATE_CONTAINER); + this.containerEvent = event; + this.originalToken = origToken; + } + + /** + * Original Token before update. + * + * @return Container Token. + */ + public ContainerTokenIdentifier getOriginalToken() { + return this.originalToken; } /** @@ -56,7 +60,7 @@ public UpdateContainerSchedulerEvent(Container originalContainer, * @return Container Token. */ public ContainerTokenIdentifier getUpdatedToken() { - return updatedToken; + return containerEvent.getUpdatedToken(); } /** @@ -64,7 +68,7 @@ public ContainerTokenIdentifier getUpdatedToken() { * @return isResourceChange. */ public boolean isResourceChange() { - return isResourceChange; + return containerEvent.isResourceChange(); } /** @@ -72,7 +76,7 @@ public boolean isResourceChange() { * @return isExecTypeUpdate. */ public boolean isExecTypeUpdate() { - return isExecTypeUpdate; + return containerEvent.isExecTypeUpdate(); } /** @@ -80,6 +84,6 @@ public boolean isExecTypeUpdate() { * @return isIncrease. */ public boolean isIncrease() { - return isIncrease; + return containerEvent.isIncrease(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 3cafcbda0a5..fc9e6c417ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -208,6 +208,8 @@ public void setup() throws IOException { containerManager.init(conf); nodeStatusUpdater.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); + ((NMContext)context).setContainerStateTransitionListener( + new NodeManager.DefaultContainerStateListener()); } protected ContainerManagerImpl diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 38df208753c..6e8c005d62f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -90,12 +90,16 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; @@ -119,6 +123,41 @@ public TestContainerManager() throws UnsupportedFileSystemException { LOG = LoggerFactory.getLogger(TestContainerManager.class); } + private static class Listener implements ContainerStateTransitionListener { + + private final Map> states = new HashMap<>(); + private final Map> events = + new HashMap<>(); + + @Override + public void init(Context context) {} + + @Override + public void preTransition(ContainerImpl op, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + if (!states.containsKey(op.getContainerId())) { + states.put(op.getContainerId(), new ArrayList<>()); + states.get(op.getContainerId()).add(beforeState); + events.put(op.getContainerId(), new ArrayList<>()); + } + } + + @Override + public void postTransition(ContainerImpl op, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState beforeState, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState afterState, + ContainerEvent processedEvent) { + states.get(op.getContainerId()).add(afterState); + events.get(op.getContainerId()).add(processedEvent.getType()); + } + } + private boolean delayContainers = false; @Override @@ -144,7 +183,7 @@ public int launchContainer(ContainerStartContext ctx) @Override protected ContainerManagerImpl createContainerManager(DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler) { @Override @@ -496,6 +535,9 @@ private String[] testContainerReInitSuccess(boolean autoCommit) @Test public void testContainerUpgradeSuccessAutoCommit() throws IOException, InterruptedException, YarnException { + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.context. + getContainerStateTransitionListener()).addListener(listener); testContainerReInitSuccess(true); // Should not be able to Commit (since already auto committed) try { @@ -504,6 +546,41 @@ public void testContainerUpgradeSuccessAutoCommit() throws IOException, } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Nothing to Commit")); } + + List containerStates = + listener.states.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING), containerStates); + + List containerEventTypes = + listener.events.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + ContainerEventType.INIT_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.REINITIALIZE_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes); } @Test @@ -524,6 +601,9 @@ public void testContainerUpgradeSuccessExplicitCommit() throws IOException, @Test public void testContainerUpgradeSuccessExplicitRollback() throws IOException, InterruptedException, YarnException { + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.context. + getContainerStateTransitionListener()).addListener(listener); String[] pids = testContainerReInitSuccess(false); // Test that the container can be Restarted after the successful upgrrade. @@ -575,6 +655,67 @@ public void testContainerUpgradeSuccessExplicitRollback() throws IOException, Assert.assertNotEquals("The Rolled-back process should be a different pid", pids[0], rolledBackPid); + + List containerStates = + listener.states.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + // This is the successful restart + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + // This is the rollback + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING), containerStates); + + List containerEventTypes = + listener.events.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + ContainerEventType.INIT_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.REINITIALIZE_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.REINITIALIZE_CONTAINER, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.ROLLBACK_REINIT, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes); } @Test @@ -584,6 +725,9 @@ public void testContainerUpgradeLocalizationFailure() throws IOException, return; } containerManager.start(); + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.context. + getContainerStateTransitionListener()).addListener(listener); // ////// Construct the Container-id ContainerId cId = createContainerId(0); File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); @@ -598,6 +742,32 @@ public void testContainerUpgradeLocalizationFailure() throws IOException, // since upgrade was terminated.. Assert.assertTrue("Process is NOT alive!", DefaultContainerExecutor.containerIsAlive(pid)); + + List containerStates = + listener.states.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING), containerStates); + + List containerEventTypes = + listener.events.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + ContainerEventType.INIT_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.REINITIALIZE_CONTAINER, + ContainerEventType.RESOURCE_FAILED), containerEventTypes); } @Test @@ -632,6 +802,9 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException, return; } containerManager.start(); + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.context. + getContainerStateTransitionListener()).addListener(listener); // ////// Construct the Container-id ContainerId cId = createContainerId(0); File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); @@ -666,6 +839,50 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException, Assert.assertNotEquals("The Rolled-back process should be a different pid", pid, rolledBackPid); + + List containerStates = + listener.states.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.REINITIALIZING_AWAITING_KILL, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING), containerStates); + + List containerEventTypes = + listener.events.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + ContainerEventType.INIT_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.REINITIALIZE_CONTAINER, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes); } /** @@ -1582,16 +1799,12 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception containerManager.updateContainer(updateRequest); // Check response Assert.assertEquals( - 0, updateResponse.getSuccessfullyUpdatedContainers().size()); - Assert.assertEquals(2, updateResponse.getFailedRequests().size()); + 1, updateResponse.getSuccessfullyUpdatedContainers().size()); + Assert.assertEquals(1, updateResponse.getFailedRequests().size()); for (Map.Entry entry : updateResponse .getFailedRequests().entrySet()) { Assert.assertNotNull("Failed message", entry.getValue().getMessage()); - if (cId0.equals(entry.getKey())) { - Assert.assertTrue(entry.getValue().getMessage() - .contains("Resource can only be changed when a " - + "container is in RUNNING or SCHEDULED state")); - } else if (cId7.equals(entry.getKey())) { + if (cId7.equals(entry.getKey())) { Assert.assertTrue(entry.getValue().getMessage() .contains("Container " + cId7.toString() + " is not handled by this NodeManager")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 7c74049f654..4b380ff04a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -47,11 +48,17 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -76,6 +83,40 @@ public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException { LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); } + private static class Listener implements ContainerStateTransitionListener { + + private final Map> states = new HashMap<>(); + private final Map> events = + new HashMap<>(); + + @Override + public void init(Context context) {} + + @Override + public void preTransition(ContainerImpl op, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + if (!states.containsKey(op.getContainerId())) { + states.put(op.getContainerId(), new ArrayList<>()); + states.get(op.getContainerId()).add(beforeState); + events.put(op.getContainerId(), new ArrayList<>()); + } + } + + @Override + public void postTransition(ContainerImpl op, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState beforeState, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState afterState, + ContainerEvent processedEvent) { + states.get(op.getContainerId()).add(afterState); + events.get(op.getContainerId()).add(processedEvent.getType()); + } + } + private boolean delayContainers = true; @Override @@ -542,6 +583,10 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception { containerManager.start(); containerManager.getContainerScheduler(). setUsePauseEventForPreemption(true); + + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.getContext(). + getContainerStateTransitionListener()).addListener(listener); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); @@ -606,6 +651,39 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception { // starts running BaseContainerManagerTest.waitForNMContainerState(containerManager, createContainerId(0), ContainerState.DONE, 40); + + List containerStates = + listener.states.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.PAUSING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.PAUSED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RESUMING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.EXITED_WITH_SUCCESS, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.DONE), containerStates); + List containerEventTypes = + listener.events.get(createContainerId(0)); + Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.PAUSE_CONTAINER, + ContainerEventType.CONTAINER_PAUSED, + ContainerEventType.RESUME_CONTAINER, + ContainerEventType.CONTAINER_RESUMED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP), containerEventTypes); } /** @@ -1068,6 +1146,9 @@ public void testStopQueuedContainer() throws Exception { @Test public void testPromotionOfOpportunisticContainers() throws Exception { containerManager.start(); + Listener listener = new Listener(); + ((NodeManager.DefaultContainerStateListener)containerManager.getContext(). + getContainerStateTransitionListener()).addListener(listener); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); @@ -1150,6 +1231,7 @@ public void testPromotionOfOpportunisticContainers() throws Exception { containerStatuses = containerManager .getContainerStatuses(statRequest).getContainerStatuses(); Assert.assertEquals(1, containerStatuses.size()); + for (ContainerStatus status : containerStatuses) { if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING == status.getState()) { @@ -1160,6 +1242,25 @@ public void testPromotionOfOpportunisticContainers() throws Exception { // Ensure no containers are queued. Assert.assertEquals(0, containerScheduler.getNumQueuedContainers()); + + List containerStates = + listener.states.get(createContainerId(1)); + Assert.assertEquals(Arrays.asList( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.NEW, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container. + ContainerState.RUNNING), containerStates); + List containerEventTypes = + listener.events.get(createContainerId(1)); + Assert.assertEquals(Arrays.asList( + ContainerEventType.INIT_CONTAINER, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes); } @Test