diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9a0549d692b..59b69acdfb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,7 +89,7 @@ public class DefaultContainerExecutor extends ContainerExecutor { } protected void copyFile(Path src, Path dst, String owner) throws IOException { - lfs.util().copy(src, dst); + lfs.util().copy(src, dst, false, true); } protected void setScriptExecutable(Path script, String owner) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 52d8566d90c..f909ca59e72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -110,11 +110,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerReInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -163,6 +165,9 @@ import static org.apache.hadoop.service.Service.STATE.STARTED; public class ContainerManagerImpl extends CompositeService implements ContainerManager { + private enum ReinitOp { + UPGRADE, COMMIT, ROLLBACK, LOCALIZE; + } /** * Extra duration to wait for applications to be killed on shutdown. */ @@ -1529,18 +1534,8 @@ public class ContainerManagerImpl extends CompositeService implements ResourceLocalizationRequest request) throws YarnException, IOException { ContainerId containerId = request.getContainerId(); - Container container = context.getContainers().get(containerId); - if (container == null) { - throw new YarnException("Specified " + containerId + " does not exist!"); - } - if (!container.getContainerState() - .equals(org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING)) { - throw new YarnException( - containerId + " is at " + container.getContainerState() - + " state. Not able to localize new resources."); - } - + Container container = preUpgradeOrLocalizeCheck(containerId, + ReinitOp.LOCALIZE); try { Map> req = container.getResourceSet().addResources(request.getLocalResources()); @@ -1556,6 +1551,38 @@ public class ContainerManagerImpl extends CompositeService implements return ResourceLocalizationResponse.newInstance(); } + public void upgradeContainer(ContainerId containerId, + ContainerLaunchContext upgradeLaunchContext) throws YarnException { + Container container = preUpgradeOrLocalizeCheck(containerId, + ReinitOp.UPGRADE); + ResourceSet resourceSet = new ResourceSet(); + try { + resourceSet.addResources(upgradeLaunchContext.getLocalResources()); + dispatcher.getEventHandler().handle( + new ContainerReInitEvent(containerId, upgradeLaunchContext, + resourceSet)); + container.setIsReInitializing(true); + } catch (URISyntaxException e) { + LOG.info("Error when parsing local resource URI for upgrade of" + + "Container [" + containerId + "]", e); + throw new YarnException(e); + } + } + + private Container preUpgradeOrLocalizeCheck(ContainerId containerId, + ReinitOp op) throws YarnException { + Container container = context.getContainers().get(containerId); + if (container == null) { + throw new YarnException("Specified " + containerId + " does not exist!"); + } + if (!container.isRunning() || container.isReInitializing()) { + throw new YarnException("Cannot perform " + op + " on [" + containerId + + "]. Current state is [" + container.getContainerState() + ", " + + "isReInitializing=" + container.isReInitializing() + "]."); + } + return container; + } + @SuppressWarnings("unchecked") private void internalSignalToContainer(SignalContainerRequest request, String sentBy) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index c4cea180c35..f6c27ab5761 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -76,4 +76,10 @@ public interface Container extends EventHandler { Priority getPriority(); ResourceSet getResourceSet(); + + boolean isRunning(); + + void setIsReInitializing(boolean isReInitializing); + + boolean isReInitializing(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c6e12..0b57505d10d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -25,6 +25,7 @@ public enum ContainerEventType { KILL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, + REINITIALIZE_CONTAINER, // DownloadManager CONTAINER_INITED, @@ -36,5 +37,5 @@ public enum ContainerEventType { CONTAINER_LAUNCHED, CONTAINER_EXITED_WITH_SUCCESS, CONTAINER_EXITED_WITH_FAILURE, - CONTAINER_KILLED_ON_REQUEST, + CONTAINER_KILLED_ON_REQUEST } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index ce9e5814797..12bbea99870 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -90,13 +91,24 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerImpl implements Container { + private final static class ReInitializationContext { + private final ResourceSet resourceSet; + private final ContainerLaunchContext newLaunchContext; + + private ReInitializationContext(ContainerLaunchContext newLaunchContext, + ResourceSet resourceSet) { + this.newLaunchContext = newLaunchContext; + this.resourceSet = resourceSet; + } + } + private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; - private final ContainerLaunchContext launchContext; + private volatile ContainerLaunchContext launchContext; private final ContainerTokenIdentifier containerTokenIdentifier; private final ContainerId containerId; private volatile Resource resource; @@ -110,13 +122,15 @@ public class ContainerImpl implements Container { private long containerLaunchStartTime; private ContainerMetrics containerMetrics; private static Clock clock = SystemClock.getInstance(); - private final ContainerRetryContext containerRetryContext; + private ContainerRetryContext containerRetryContext; // remaining retries to relaunch container if needed private int remainingRetryAttempts; private String workDir; private String logDir; private String host; private String ips; + private ReInitializationContext reInitContext; + private volatile boolean isReInitializing = false; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -141,23 +155,7 @@ public class ContainerImpl implements Container { this.stateStore = context.getNMStateStore(); this.version = containerTokenIdentifier.getVersion(); this.launchContext = launchContext; - if (launchContext != null - && launchContext.getContainerRetryContext() != null) { - this.containerRetryContext = launchContext.getContainerRetryContext(); - } else { - this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; - } - this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); - int minimumRestartInterval = conf.getInt( - YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS, - YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS); - if (containerRetryContext.getRetryPolicy() - != ContainerRetryPolicy.NEVER_RETRY - && containerRetryContext.getRetryInterval() < minimumRestartInterval) { - LOG.info("Set restart interval to minimum value " + minimumRestartInterval - + "ms for container " + containerTokenIdentifier.getContainerID()); - this.containerRetryContext.setRetryInterval(minimumRestartInterval); - } + this.diagnosticsMaxSize = conf.getInt( YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE, YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE); @@ -188,11 +186,37 @@ public class ContainerImpl implements Container { containerMetrics.recordStartTime(clock.getTime()); } + // Configure the Retry Context + this.containerRetryContext = + configureRetryContext(conf, launchContext, this.containerId); + this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries(); stateMachine = stateMachineFactory.make(this); this.context = context; this.resourceSet = new ResourceSet(); } + private static ContainerRetryContext configureRetryContext( + Configuration conf, ContainerLaunchContext launchContext, + ContainerId containerId) { + ContainerRetryContext context; + if (launchContext != null + && launchContext.getContainerRetryContext() != null) { + context = launchContext.getContainerRetryContext(); + } else { + context = ContainerRetryContext.NEVER_RETRY_CONTEXT; + } + int minimumRestartInterval = conf.getInt( + YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS); + if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY + && context.getRetryInterval() < minimumRestartInterval) { + LOG.info("Set restart interval to minimum value " + minimumRestartInterval + + "ms for container " + containerId); + context.setRetryInterval(minimumRestartInterval); + } + return context; + } + // constructor for a recovered container public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -299,6 +323,9 @@ public class ContainerImpl implements Container { ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING, + ContainerEventType.REINITIALIZE_CONTAINER, + new ReInitializeContainerTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.RESOURCE_LOCALIZED, new ResourceLocalizedWhileRunningTransition()) @@ -310,10 +337,38 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) - .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, + .addTransition(ContainerState.RUNNING, + ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + // From REINITIALIZING State + .addTransition(ContainerState.REINITIALIZING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + .addTransition(ContainerState.REINITIALIZING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileReInitTransition()) + .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING, + ContainerEventType.RESOURCE_FAILED, + new ResourceLocalizationFailedWhileReInitTransition()) + .addTransition(ContainerState.REINITIALIZING, + ContainerState.REINITIALIZING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.REINITIALIZING, + ContainerState.LOCALIZED, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledForReInitializationTransition()) + // From RELAUNCHING State .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) @@ -458,7 +513,7 @@ public class ContainerImpl implements Container { } @Override - public Map> getLocalizedResources() { + public Map> getLocalizedResources() { this.readLock.lock(); try { if (ContainerState.LOCALIZED == getContainerState() @@ -775,7 +830,7 @@ public class ContainerImpl implements Container { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; LocalResourceRequest resourceRequest = rsrcEvent.getResource(); Path location = rsrcEvent.getLocation(); - List syms = + Set syms = container.resourceSet.resourceLocalized(resourceRequest, location); if (null == syms) { LOG.info("Localized resource " + resourceRequest + @@ -822,17 +877,86 @@ public class ContainerImpl implements Container { } /** - * Resource is localized while the container is running - create symlinks + * Transition to start the Re-Initialization process. */ - static class ResourceLocalizedWhileRunningTransition + static class ReInitializeContainerTransition extends ContainerTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + container.reInitContext = createReInitContext(event); + try { + Map> + pendingResources = + container.reInitContext.resourceSet.getAllResourcesByVisibility(); + if (!pendingResources.isEmpty()) { + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationRequestEvent( + container, pendingResources)); + } else { + // We are not waiting on any resources, so... + // Kill the current container. + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT)); + } + } catch (Exception e) { + LOG.error("Container [" + container.getContainerId() + "]" + + " re-initialization failure..", e); + container.addDiagnostics("Error re-initializing due to" + + "[" + e.getMessage() + "]"); + } + } + + protected ReInitializationContext createReInitContext( + ContainerEvent event) { + ContainerReInitEvent rEvent = (ContainerReInitEvent)event; + return new ReInitializationContext(rEvent.getReInitLaunchContext(), + rEvent.getResourceSet()); + } + } + + /** + * Resource requested for Container Re-initialization has been localized. + * If all dependencies are met, then restart Container with new bits. + */ + static class ResourceLocalizedWhileReInitTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List links = container.resourceSet - .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation()); + container.reInitContext.resourceSet.resourceLocalized( + rsrcEvent.getResource(), rsrcEvent.getLocation()); + // Check if all ResourceLocalization has completed + if (container.reInitContext.resourceSet.getPendingResources() + .isEmpty()) { + // Kill the current container. + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT)); + } + } + } + + /** + * Resource is localized while the container is running - create symlinks. + */ + static class ResourceLocalizedWhileRunningTransition + extends ContainerTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerResourceLocalizedEvent rsrcEvent = + (ContainerResourceLocalizedEvent) event; + Set links = container.resourceSet.resourceLocalized( + rsrcEvent.getResource(), rsrcEvent.getLocation()); + if (links == null) { + return; + } // creating symlinks. for (String link : links) { try { @@ -871,9 +995,30 @@ public class ContainerImpl implements Container { } } + /** + * Resource localization failed while the container is reinitializing. + */ + static class ResourceLocalizationFailedWhileReInitTransition + extends ContainerTransition { + + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerResourceFailedEvent failedEvent = + (ContainerResourceFailedEvent) event; + container.resourceSet.resourceLocalizationFailed( + failedEvent.getResource()); + container.addDiagnostics("Container aborting re-initialization.. " + + failedEvent.getDiagnosticMessage()); + LOG.error("Container [" + container.getContainerId() + "] Re-init" + + " failed !! Resource [" + failedEvent.getResource() + "] could" + + " not be localized !!"); + container.reInitContext = null; + } + } + /** * Transition from LOCALIZED state to RUNNING state upon receiving - * a CONTAINER_LAUNCHED event + * a CONTAINER_LAUNCHED event. */ static class LaunchTransition extends ContainerTransition { @SuppressWarnings("unchecked") @@ -883,6 +1028,12 @@ public class ContainerImpl implements Container { container.metrics.runningContainer(); container.wasLaunched = true; + if (container.reInitContext != null) { + container.reInitContext = null; + // Set rollback context here.. + container.setIsReInitializing(false); + } + if (container.recoveredAsKilled) { LOG.info("Killing " + container.containerId + " due to recovered as killed"); @@ -895,8 +1046,8 @@ public class ContainerImpl implements Container { } /** - * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state - * upon EXITED_WITH_SUCCESS message. + * Transition from RUNNING or KILLING state to + * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message. */ @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithSuccessTransition extends ContainerTransition { @@ -909,6 +1060,8 @@ public class ContainerImpl implements Container { @Override public void transition(ContainerImpl container, ContainerEvent event) { + + container.setIsReInitializing(false); // Set exit code to 0 on success container.exitCode = 0; @@ -939,6 +1092,7 @@ public class ContainerImpl implements Container { @Override public void transition(ContainerImpl container, ContainerEvent event) { + container.setIsReInitializing(false); ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); if (exitEvent.getDiagnosticInfo() != null) { @@ -959,7 +1113,7 @@ public class ContainerImpl implements Container { } /** - * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon * CONTAINER_EXITED_WITH_FAILURE state. **/ @SuppressWarnings("unchecked") // dispatcher not typed @@ -991,7 +1145,7 @@ public class ContainerImpl implements Container { } catch (IOException e) { LOG.warn( "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); + + container.getContainerId(), e); } } LOG.info("Relaunching Container " + container.getContainerId() @@ -1053,7 +1207,7 @@ public class ContainerImpl implements Container { } /** - * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST + * Transition to EXITED_WITH_FAILURE */ static class KilledExternallyTransition extends ExitedWithFailureTransition { KilledExternallyTransition() { @@ -1061,12 +1215,43 @@ public class ContainerImpl implements Container { } @Override - public void transition(ContainerImpl container, ContainerEvent event) { + public void transition(ContainerImpl container, + ContainerEvent event) { super.transition(container, event); container.addDiagnostics("Killed by external signal\n"); } } + /** + * Transition to LOCALIZED and wait for RE-LAUNCH + */ + static class KilledForReInitializationTransition extends ContainerTransition { + + @Override + public void transition(ContainerImpl container, + ContainerEvent event) { + LOG.info("Relaunching Container [" + container.getContainerId() + + "] for upgrade !!"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + + container.launchContext = container.reInitContext.newLaunchContext; + + // Re configure the Retry Context + container.containerRetryContext = + configureRetryContext(container.context.getConf(), + container.launchContext, container.containerId); + // Reset the retry attempts since its a fresh start + container.remainingRetryAttempts = + container.containerRetryContext.getMaxRetries(); + + container.resourceSet = ResourceSet.merge( + container.resourceSet, container.reInitContext.resourceSet); + + container.sendLaunchEvent(); + } + } + /** * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving * RESOURCE_FAILED event. @@ -1122,16 +1307,20 @@ public class ContainerImpl implements Container { } /** - * Transitions upon receiving KILL_CONTAINER: - * - LOCALIZED -> KILLING - * - RUNNING -> KILLING + * Transitions upon receiving KILL_CONTAINER. + * - LOCALIZED -> KILLING. + * - RUNNING -> KILLING. + * - REINITIALIZING -> KILLING. */ @SuppressWarnings("unchecked") // dispatcher not typed static class KillTransition implements SingleArcTransition { + + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { // Kill the process/process-grp + container.setIsReInitializing(false); container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER)); @@ -1385,4 +1574,19 @@ public class ContainerImpl implements Container { public Priority getPriority() { return containerTokenIdentifier.getPriority(); } + + @Override + public boolean isRunning() { + return getContainerState() == ContainerState.RUNNING; + } + + @Override + public void setIsReInitializing(boolean isReInitializing) { + this.isReInitializing = isReInitializing; + } + + @Override + public boolean isReInitializing() { + return this.isReInitializing; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java new file mode 100644 index 00000000000..2ccdbd7f65e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; + +/** + * ContainerEvent sent by ContainerManager to ContainerImpl to + * re-initiate Container. + */ +public class ContainerReInitEvent extends ContainerEvent { + + private final ContainerLaunchContext reInitLaunchContext; + private final ResourceSet resourceSet; + + /** + * Container Re-Init Event. + * @param cID Container Id + * @param upgradeContext Upgrade context + * @param resourceSet Resource Set + */ + public ContainerReInitEvent(ContainerId cID, + ContainerLaunchContext upgradeContext, ResourceSet resourceSet){ + super(cID, ContainerEventType.REINITIALIZE_CONTAINER); + this.reInitLaunchContext = upgradeContext; + this.resourceSet = resourceSet; + } + + /** + * Get the Launch Context to be used for upgrade. + * @return ContainerLaunchContext + */ + public ContainerLaunchContext getReInitLaunchContext() { + return reInitLaunchContext; + } + + /** + * Get the ResourceSet. + * @return ResourceSet. + */ + public ResourceSet getResourceSet() { + return resourceSet; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 6b96204a9e6..70de90c34eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -20,6 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING, - EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, + REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index e5fff00fbe7..d4a7bfdacf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -137,6 +137,7 @@ public class ContainersLauncher extends AbstractService running.put(containerId, launch); break; case CLEANUP_CONTAINER: + case CLEANUP_CONTAINER_FOR_REINIT: ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { // Container not launched. So nothing needs to be done. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 2d7bc743021..380a032ca78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -23,5 +23,6 @@ public enum ContainersLauncherEventType { RELAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself. SIGNAL_CONTAINER, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b281ef53258..2cf6ee90941 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -470,7 +470,8 @@ public class ResourceLocalizationService extends CompositeService ContainerLocalizationRequestEvent rsrcReqs) { Container c = rsrcReqs.getContainer(); EnumSet set = - EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING); + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.RUNNING, ContainerState.REINITIALIZING); if (!set.contains(c.getContainerState())) { LOG.warn(c.getContainerId() + " is at " + c.getContainerState() + " state, do not localize resources."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java index a41ee20c4a2..5da3abc033f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -43,9 +43,9 @@ public class ResourceSet { private static final Log LOG = LogFactory.getLog(ResourceSet.class); // resources by localization state (localized, pending, failed) - private Map> localizedResources = + private Map localizedResources = new ConcurrentHashMap<>(); - private Map> pendingResources = + private Map> pendingResources = new ConcurrentHashMap<>(); private Set resourcesFailedToBeLocalized = new HashSet<>(); @@ -69,7 +69,7 @@ public class ResourceSet { if (localResourceMap == null || localResourceMap.isEmpty()) { return null; } - Map> allResources = new HashMap<>(); + Map> allResources = new HashMap<>(); List publicList = new ArrayList<>(); List privateList = new ArrayList<>(); List appList = new ArrayList<>(); @@ -77,7 +77,7 @@ public class ResourceSet { for (Map.Entry rsrc : localResourceMap.entrySet()) { LocalResource resource = rsrc.getValue(); LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); - allResources.putIfAbsent(req, new ArrayList<>()); + allResources.putIfAbsent(req, new HashSet<>()); allResources.get(req).add(rsrc.getKey()); storeSharedCacheUploadPolicy(req, resource.getShouldBeUploadedToSharedCache()); @@ -121,13 +121,15 @@ public class ResourceSet { * @param location The path where the resource is localized * @return The list of symlinks for the localized resources. */ - public List resourceLocalized(LocalResourceRequest request, + public Set resourceLocalized(LocalResourceRequest request, Path location) { - List symlinks = pendingResources.remove(request); + Set symlinks = pendingResources.remove(request); if (symlinks == null) { return null; } else { - localizedResources.put(location, symlinks); + for (String symlink : symlinks) { + localizedResources.put(symlink, location); + } return symlinks; } } @@ -175,7 +177,12 @@ public class ResourceSet { } public Map> getLocalizedResources() { - return localizedResources; + Map> map = new HashMap<>(); + for (Map.Entry entry : localizedResources.entrySet()) { + map.putIfAbsent(entry.getValue(), new ArrayList<>()); + map.get(entry.getValue()).add(entry.getKey()); + } + return map; } public Map getResourcesToBeUploaded() { @@ -186,7 +193,25 @@ public class ResourceSet { return resourcesUploadPolicies; } - public Map> getPendingResources() { + public Map> getPendingResources() { return pendingResources; } + + public static ResourceSet merge(ResourceSet... resourceSets) { + ResourceSet merged = new ResourceSet(); + for (ResourceSet rs : resourceSets) { + // This should overwrite existing symlinks + merged.localizedResources.putAll(rs.localizedResources); + + merged.resourcesToBeUploaded.putAll(rs.resourcesToBeUploaded); + merged.resourcesUploadPolicies.putAll(rs.resourcesUploadPolicies); + + // TODO : START : Should we de-dup here ? + merged.publicRsrcs.addAll(rs.publicRsrcs); + merged.privateRsrcs.addAll(rs.privateRsrcs); + merged.appRsrcs.addAll(rs.appRsrcs); + // TODO : END + } + return merged; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java index 43a2f33fec3..03442756582 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java @@ -39,8 +39,8 @@ public class ContainerLocalizationRequestEvent extends /** * Event requesting the localization of the rsrc. - * @param c - * @param rsrc + * @param c Container + * @param rsrc LocalResourceRequests map */ public ContainerLocalizationRequestEvent(Container c, Map> rsrc) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index aa0d975ea37..8a278494a43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -269,6 +269,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager { super.testForcefulShutdownSignal(); } + @Override + public void testContainerUpgradeSuccess() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeSuccess"); + super.testContainerUpgradeSuccess(); + } + + @Override + public void testContainerUpgradeLocalizationFailure() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeLocalizationFailure"); + super.testContainerUpgradeLocalizationFailure(); + } + + @Override + public void testContainerUpgradeProcessFailure() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeProcessFailure"); + super.testContainerUpgradeProcessFailure(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index ec3850181b6..d359c3d8662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -123,7 +123,11 @@ public abstract class BaseContainerManagerTest { conf) { public int getHttpPort() { return HTTP_PORT; - }; + } + @Override + public ContainerExecutor getContainerExecutor() { + return exec; + } }; protected ContainerExecutor exec; protected DeletionService delSrvc; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 5785e1f96d2..843dc2a1518 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; @@ -33,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,6 +66,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -94,7 +98,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -366,6 +369,237 @@ public class TestContainerManager extends BaseContainerManagerTest { DefaultContainerExecutor.containerIsAlive(pid)); } + @Test + public void testContainerUpgradeSuccess() throws IOException, + InterruptedException, YarnException { + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(false, false, cId, newStartFile); + + // Assert that the First process is not alive anymore + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + + BufferedReader reader = + new BufferedReader(new FileReader(newStartFile)); + Assert.assertEquals("Upgrade World!", reader.readLine()); + + // Get the pid of the process + String newPid = reader.readLine().trim(); + Assert.assertNotEquals("Old and New Pids must be different !", pid, newPid); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + reader.close(); + + // Verify old file still exists and is accessible by + // the new process... + reader = new BufferedReader(new FileReader(oldStartFile)); + Assert.assertEquals("Hello World!", reader.readLine()); + + // Assert that the New process is alive + Assert.assertTrue("New Process is not alive!", + DefaultContainerExecutor.containerIsAlive(newPid)); + } + + @Test + public void testContainerUpgradeLocalizationFailure() throws IOException, + InterruptedException, YarnException { + if (Shell.WINDOWS) { + return; + } + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(true, true, cId, newStartFile); + + // Assert that the First process is STILL alive + // since upgrade was terminated.. + Assert.assertTrue("Process is NOT alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + } + + @Test + public void testContainerUpgradeProcessFailure() throws IOException, + InterruptedException, YarnException { + if (Shell.WINDOWS) { + return; + } + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(true, false, cId, newStartFile); + + // Assert that the First process is not alive anymore + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + } + + /** + * Prepare a launch Context for container upgrade and request the + * Container Manager to re-initialize a running container using the + * new launch context. + * @param failCmd injects a start script that intentionally fails. + * @param failLoc injects a bad file Location that will fail localization. + */ + private void prepareContainerUpgrade(boolean failCmd, boolean failLoc, + ContainerId cId, File startFile) + throws FileNotFoundException, YarnException, InterruptedException { + // Re-write scriptfile and processStartFile + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + + writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc); + + containerManager.upgradeContainer(cId, containerLaunchContext); + try { + containerManager.upgradeContainer(cId, containerLaunchContext); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE")); + } + int timeoutSecs = 0; + int maxTimeToWait = failLoc ? 10 : 20; + // Wait for new processStartfile to be created + while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) { + Thread.sleep(1000); + LOG.info("Waiting for New process start-file to be created"); + } + } + + /** + * Prepare and start an initial container. This container will be subsequently + * re-initialized for upgrade. It also waits for the container to start and + * returns the Pid of the running container. + */ + private String prepareInitialContainer(ContainerId cId, File startFile) + throws IOException, YarnException, InterruptedException { + File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriterOld = new PrintWriter(scriptFileOld); + + writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFileOld, "dest_file", false); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + createContainerToken(cId, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager())); + List list = new ArrayList<>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!startFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + startFile.exists()); + + // Now verify the contents of the file + BufferedReader reader = + new BufferedReader(new FileReader(startFile)); + Assert.assertEquals("Hello World!", reader.readLine()); + // Get the pid of the process + String pid = reader.readLine().trim(); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Assert that the process is alive + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + // Once more + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + return pid; + } + + private void writeScriptFile(PrintWriter fileWriter, String startLine, + File processStartFile, ContainerId cId, boolean isFailure) { + if (Shell.WINDOWS) { + fileWriter.println("@echo " + startLine + "> " + processStartFile); + fileWriter.println("@echo " + cId + ">> " + processStartFile); + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); // So that start file is readable by test + if (isFailure) { + // Echo PID and throw some error code + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexit 111"); + } else { + fileWriter.write("\necho " + startLine + " > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 100"); + } + } + fileWriter.close(); + } + + private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile, + String destFName, boolean putBadFile) { + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resourceAlpha = null; + if (putBadFile) { + File fileToDelete = new File(tmpDir, "fileToDelete") + .getAbsoluteFile(); + resourceAlpha = + URL.fromPath(localFS + .makeQualified(new Path(fileToDelete.getAbsolutePath()))); + fileToDelete.delete(); + } else { + resourceAlpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + } + LocalResource rsrcAlpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrcAlpha.setResource(resourceAlpha); + rsrcAlpha.setSize(-1); + rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrcAlpha.setType(LocalResourceType.FILE); + rsrcAlpha.setTimestamp(scriptFile.lastModified()); + Map localResources = new HashMap<>(); + localResources.put(destFName, rsrcAlpha); + containerLaunchContext.setLocalResources(localResources); + + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0); + containerLaunchContext.setContainerRetryContext(containerRetryContext); + List commands = Arrays.asList( + Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + return containerLaunchContext; + } + protected void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException, YarnException { @@ -556,7 +790,7 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.fail(); } catch (YarnException e) { Assert.assertTrue( - e.getMessage().contains("Not able to localize new resources")); + e.getMessage().contains("Cannot perform LOCALIZE")); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index c1765569e5b..8c8bec7845f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -190,4 +190,19 @@ public class MockContainer implements Container { public void setIpAndHost(String[] ipAndHost) { } + + @Override + public boolean isRunning() { + return false; + } + + @Override + public void setIsReInitializing(boolean isReInitializing) { + + } + + @Override + public boolean isReInitializing() { + return false; + } }