YARN-5620. Core changes in NodeManager to support re-initialization of Containers with new launchContext. (asuresh)

This commit is contained in:
Arun Suresh 2016-09-15 07:15:11 -07:00
parent 2a8f55a0cf
commit 40b5a59b72
16 changed files with 683 additions and 66 deletions

View File

@ -89,7 +89,7 @@ public DefaultContainerExecutor() {
}
protected void copyFile(Path src, Path dst, String owner) throws IOException {
lfs.util().copy(src, dst);
lfs.util().copy(src, dst, false, true);
}
protected void setScriptExecutable(Path script, String owner) throws IOException {

View File

@ -110,11 +110,13 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerReInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -163,6 +165,9 @@
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
private enum ReinitOp {
UPGRADE, COMMIT, ROLLBACK, LOCALIZE;
}
/**
* Extra duration to wait for applications to be killed on shutdown.
*/
@ -1529,18 +1534,8 @@ public ResourceLocalizationResponse localize(
ResourceLocalizationRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
}
if (!container.getContainerState()
.equals(org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING)) {
throw new YarnException(
containerId + " is at " + container.getContainerState()
+ " state. Not able to localize new resources.");
}
Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.LOCALIZE);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources());
@ -1556,6 +1551,38 @@ public ResourceLocalizationResponse localize(
return ResourceLocalizationResponse.newInstance();
}
public void upgradeContainer(ContainerId containerId,
ContainerLaunchContext upgradeLaunchContext) throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.UPGRADE);
ResourceSet resourceSet = new ResourceSet();
try {
resourceSet.addResources(upgradeLaunchContext.getLocalResources());
dispatcher.getEventHandler().handle(
new ContainerReInitEvent(containerId, upgradeLaunchContext,
resourceSet));
container.setIsReInitializing(true);
} catch (URISyntaxException e) {
LOG.info("Error when parsing local resource URI for upgrade of" +
"Container [" + containerId + "]", e);
throw new YarnException(e);
}
}
private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
ReinitOp op) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
}
if (!container.isRunning() || container.isReInitializing()) {
throw new YarnException("Cannot perform " + op + " on [" + containerId
+ "]. Current state is [" + container.getContainerState() + ", " +
"isReInitializing=" + container.isReInitializing() + "].");
}
return container;
}
@SuppressWarnings("unchecked")
private void internalSignalToContainer(SignalContainerRequest request,
String sentBy) {

View File

@ -76,4 +76,10 @@ public interface Container extends EventHandler<ContainerEvent> {
Priority getPriority();
ResourceSet getResourceSet();
boolean isRunning();
void setIsReInitializing(boolean isReInitializing);
boolean isReInitializing();
}

View File

@ -25,6 +25,7 @@ public enum ContainerEventType {
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
// DownloadManager
CONTAINER_INITED,
@ -36,5 +37,5 @@ public enum ContainerEventType {
CONTAINER_LAUNCHED,
CONTAINER_EXITED_WITH_SUCCESS,
CONTAINER_EXITED_WITH_FAILURE,
CONTAINER_KILLED_ON_REQUEST,
CONTAINER_KILLED_ON_REQUEST
}

View File

@ -27,6 +27,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -90,13 +91,24 @@
public class ContainerImpl implements Container {
private final static class ReInitializationContext {
private final ResourceSet resourceSet;
private final ContainerLaunchContext newLaunchContext;
private ReInitializationContext(ContainerLaunchContext newLaunchContext,
ResourceSet resourceSet) {
this.newLaunchContext = newLaunchContext;
this.resourceSet = resourceSet;
}
}
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
private final NMStateStoreService stateStore;
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext;
private volatile ContainerLaunchContext launchContext;
private final ContainerTokenIdentifier containerTokenIdentifier;
private final ContainerId containerId;
private volatile Resource resource;
@ -110,13 +122,15 @@ public class ContainerImpl implements Container {
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
private final ContainerRetryContext containerRetryContext;
private ContainerRetryContext containerRetryContext;
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
private String workDir;
private String logDir;
private String host;
private String ips;
private ReInitializationContext reInitContext;
private volatile boolean isReInitializing = false;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@ -141,23 +155,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.stateStore = context.getNMStateStore();
this.version = containerTokenIdentifier.getVersion();
this.launchContext = launchContext;
if (launchContext != null
&& launchContext.getContainerRetryContext() != null) {
this.containerRetryContext = launchContext.getContainerRetryContext();
} else {
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
}
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
int minimumRestartInterval = conf.getInt(
YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
if (containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY
&& containerRetryContext.getRetryInterval() < minimumRestartInterval) {
LOG.info("Set restart interval to minimum value " + minimumRestartInterval
+ "ms for container " + containerTokenIdentifier.getContainerID());
this.containerRetryContext.setRetryInterval(minimumRestartInterval);
}
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
@ -188,11 +186,37 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
containerMetrics.recordStartTime(clock.getTime());
}
// Configure the Retry Context
this.containerRetryContext =
configureRetryContext(conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
}
private static ContainerRetryContext configureRetryContext(
Configuration conf, ContainerLaunchContext launchContext,
ContainerId containerId) {
ContainerRetryContext context;
if (launchContext != null
&& launchContext.getContainerRetryContext() != null) {
context = launchContext.getContainerRetryContext();
} else {
context = ContainerRetryContext.NEVER_RETRY_CONTEXT;
}
int minimumRestartInterval = conf.getInt(
YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY
&& context.getRetryInterval() < minimumRestartInterval) {
LOG.info("Set restart interval to minimum value " + minimumRestartInterval
+ "ms for container " + containerId);
context.setRetryInterval(minimumRestartInterval);
}
return context;
}
// constructor for a recovered container
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@ -299,6 +323,9 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
@ -310,10 +337,38 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
.addTransition(ContainerState.RUNNING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
// From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.REINITIALIZING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileReInitTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_FAILED,
new ResourceLocalizationFailedWhileReInitTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.LOCALIZED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledForReInitializationTransition())
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@ -458,7 +513,7 @@ public String getUser() {
}
@Override
public Map<Path,List<String>> getLocalizedResources() {
public Map<Path, List<String>> getLocalizedResources() {
this.readLock.lock();
try {
if (ContainerState.LOCALIZED == getContainerState()
@ -775,7 +830,7 @@ public ContainerState transition(ContainerImpl container,
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
List<String> syms =
Set<String> syms =
container.resourceSet.resourceLocalized(resourceRequest, location);
if (null == syms) {
LOG.info("Localized resource " + resourceRequest +
@ -822,17 +877,86 @@ public ContainerState transition(ContainerImpl container,
}
/**
* Resource is localized while the container is running - create symlinks
* Transition to start the Re-Initialization process.
*/
static class ResourceLocalizedWhileRunningTransition
static class ReInitializeContainerTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.reInitContext = createReInitContext(event);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
pendingResources =
container.reInitContext.resourceSet.getAllResourcesByVisibility();
if (!pendingResources.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
container, pendingResources));
} else {
// We are not waiting on any resources, so...
// Kill the current container.
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
}
} catch (Exception e) {
LOG.error("Container [" + container.getContainerId() + "]" +
" re-initialization failure..", e);
container.addDiagnostics("Error re-initializing due to" +
"[" + e.getMessage() + "]");
}
}
protected ReInitializationContext createReInitContext(
ContainerEvent event) {
ContainerReInitEvent rEvent = (ContainerReInitEvent)event;
return new ReInitializationContext(rEvent.getReInitLaunchContext(),
rEvent.getResourceSet());
}
}
/**
* Resource requested for Container Re-initialization has been localized.
* If all dependencies are met, then restart Container with new bits.
*/
static class ResourceLocalizedWhileReInitTransition
extends ContainerTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
List<String> links = container.resourceSet
.resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
container.reInitContext.resourceSet.resourceLocalized(
rsrcEvent.getResource(), rsrcEvent.getLocation());
// Check if all ResourceLocalization has completed
if (container.reInitContext.resourceSet.getPendingResources()
.isEmpty()) {
// Kill the current container.
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
}
}
}
/**
* Resource is localized while the container is running - create symlinks.
*/
static class ResourceLocalizedWhileRunningTransition
extends ContainerTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
Set<String> links = container.resourceSet.resourceLocalized(
rsrcEvent.getResource(), rsrcEvent.getLocation());
if (links == null) {
return;
}
// creating symlinks.
for (String link : links) {
try {
@ -871,9 +995,30 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
}
/**
* Resource localization failed while the container is reinitializing.
*/
static class ResourceLocalizationFailedWhileReInitTransition
extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceFailedEvent failedEvent =
(ContainerResourceFailedEvent) event;
container.resourceSet.resourceLocalizationFailed(
failedEvent.getResource());
container.addDiagnostics("Container aborting re-initialization.. "
+ failedEvent.getDiagnosticMessage());
LOG.error("Container [" + container.getContainerId() + "] Re-init" +
" failed !! Resource [" + failedEvent.getResource() + "] could" +
" not be localized !!");
container.reInitContext = null;
}
}
/**
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
* a CONTAINER_LAUNCHED event.
*/
static class LaunchTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@ -883,6 +1028,12 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.runningContainer();
container.wasLaunched = true;
if (container.reInitContext != null) {
container.reInitContext = null;
// Set rollback context here..
container.setIsReInitializing(false);
}
if (container.recoveredAsKilled) {
LOG.info("Killing " + container.containerId
+ " due to recovered as killed");
@ -895,8 +1046,8 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
* Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
* upon EXITED_WITH_SUCCESS message.
* Transition from RUNNING or KILLING state to
* EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@ -909,6 +1060,8 @@ public ExitedWithSuccessTransition(boolean clCleanupRequired) {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.setIsReInitializing(false);
// Set exit code to 0 on success
container.exitCode = 0;
@ -939,6 +1092,7 @@ public ExitedWithFailureTransition(boolean clCleanupRequired) {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.setIsReInitializing(false);
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
@ -959,7 +1113,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
* Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
* Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed
@ -991,7 +1145,7 @@ public ContainerState transition(final ContainerImpl container,
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
+ container.getContainerId(), e);
+ container.getContainerId(), e);
}
}
LOG.info("Relaunching Container " + container.getContainerId()
@ -1053,7 +1207,7 @@ public boolean shouldRetry(int errorCode) {
}
/**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
* Transition to EXITED_WITH_FAILURE
*/
static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() {
@ -1061,12 +1215,43 @@ static class KilledExternallyTransition extends ExitedWithFailureTransition {
}
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
public void transition(ContainerImpl container,
ContainerEvent event) {
super.transition(container, event);
container.addDiagnostics("Killed by external signal\n");
}
}
/**
* Transition to LOCALIZED and wait for RE-LAUNCH
*/
static class KilledForReInitializationTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container,
ContainerEvent event) {
LOG.info("Relaunching Container [" + container.getContainerId()
+ "] for upgrade !!");
container.wasLaunched = false;
container.metrics.endRunningContainer();
container.launchContext = container.reInitContext.newLaunchContext;
// Re configure the Retry Context
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId);
// Reset the retry attempts since its a fresh start
container.remainingRetryAttempts =
container.containerRetryContext.getMaxRetries();
container.resourceSet = ResourceSet.merge(
container.resourceSet, container.reInitContext.resourceSet);
container.sendLaunchEvent();
}
}
/**
* Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
* RESOURCE_FAILED event.
@ -1122,16 +1307,20 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
* Transitions upon receiving KILL_CONTAINER:
* - LOCALIZED -> KILLING
* - RUNNING -> KILLING
* Transitions upon receiving KILL_CONTAINER.
* - LOCALIZED -> KILLING.
* - RUNNING -> KILLING.
* - REINITIALIZING -> KILLING.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Kill the process/process-grp
container.setIsReInitializing(false);
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
@ -1385,4 +1574,19 @@ ContainerRetryContext getContainerRetryContext() {
public Priority getPriority() {
return containerTokenIdentifier.getPriority();
}
@Override
public boolean isRunning() {
return getContainerState() == ContainerState.RUNNING;
}
@Override
public void setIsReInitializing(boolean isReInitializing) {
this.isReInitializing = isReInitializing;
}
@Override
public boolean isReInitializing() {
return this.isReInitializing;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
/**
* ContainerEvent sent by ContainerManager to ContainerImpl to
* re-initiate Container.
*/
public class ContainerReInitEvent extends ContainerEvent {
private final ContainerLaunchContext reInitLaunchContext;
private final ResourceSet resourceSet;
/**
* Container Re-Init Event.
* @param cID Container Id
* @param upgradeContext Upgrade context
* @param resourceSet Resource Set
*/
public ContainerReInitEvent(ContainerId cID,
ContainerLaunchContext upgradeContext, ResourceSet resourceSet){
super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
this.reInitLaunchContext = upgradeContext;
this.resourceSet = resourceSet;
}
/**
* Get the Launch Context to be used for upgrade.
* @return ContainerLaunchContext
*/
public ContainerLaunchContext getReInitLaunchContext() {
return reInitLaunchContext;
}
/**
* Get the ResourceSet.
* @return ResourceSet.
*/
public ResourceSet getResourceSet() {
return resourceSet;
}
}

View File

@ -20,6 +20,6 @@
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
}

View File

@ -137,6 +137,7 @@ public void handle(ContainersLauncherEvent event) {
running.put(containerId, launch);
break;
case CLEANUP_CONTAINER:
case CLEANUP_CONTAINER_FOR_REINIT:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
// Container not launched. So nothing needs to be done.

View File

@ -23,5 +23,6 @@ public enum ContainersLauncherEventType {
RELAUNCH_CONTAINER,
RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
}

View File

@ -470,7 +470,8 @@ private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
EnumSet<ContainerState> set =
EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
EnumSet.of(ContainerState.LOCALIZING,
ContainerState.RUNNING, ContainerState.REINITIALIZING);
if (!set.contains(c.getContainerState())) {
LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
+ " state, do not localize resources.");

View File

@ -43,9 +43,9 @@ public class ResourceSet {
private static final Log LOG = LogFactory.getLog(ResourceSet.class);
// resources by localization state (localized, pending, failed)
private Map<Path, List<String>> localizedResources =
private Map<String, Path> localizedResources =
new ConcurrentHashMap<>();
private Map<LocalResourceRequest, List<String>> pendingResources =
private Map<LocalResourceRequest, Set<String>> pendingResources =
new ConcurrentHashMap<>();
private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
new HashSet<>();
@ -69,7 +69,7 @@ public class ResourceSet {
if (localResourceMap == null || localResourceMap.isEmpty()) {
return null;
}
Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
Map<LocalResourceRequest, Set<String>> allResources = new HashMap<>();
List<LocalResourceRequest> publicList = new ArrayList<>();
List<LocalResourceRequest> privateList = new ArrayList<>();
List<LocalResourceRequest> appList = new ArrayList<>();
@ -77,7 +77,7 @@ public class ResourceSet {
for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
LocalResource resource = rsrc.getValue();
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
allResources.putIfAbsent(req, new ArrayList<>());
allResources.putIfAbsent(req, new HashSet<>());
allResources.get(req).add(rsrc.getKey());
storeSharedCacheUploadPolicy(req,
resource.getShouldBeUploadedToSharedCache());
@ -121,13 +121,15 @@ public class ResourceSet {
* @param location The path where the resource is localized
* @return The list of symlinks for the localized resources.
*/
public List<String> resourceLocalized(LocalResourceRequest request,
public Set<String> resourceLocalized(LocalResourceRequest request,
Path location) {
List<String> symlinks = pendingResources.remove(request);
Set<String> symlinks = pendingResources.remove(request);
if (symlinks == null) {
return null;
} else {
localizedResources.put(location, symlinks);
for (String symlink : symlinks) {
localizedResources.put(symlink, location);
}
return symlinks;
}
}
@ -175,7 +177,12 @@ private void storeSharedCacheUploadPolicy(
}
public Map<Path, List<String>> getLocalizedResources() {
return localizedResources;
Map<Path, List<String>> map = new HashMap<>();
for (Map.Entry<String, Path> entry : localizedResources.entrySet()) {
map.putIfAbsent(entry.getValue(), new ArrayList<>());
map.get(entry.getValue()).add(entry.getKey());
}
return map;
}
public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
@ -186,7 +193,25 @@ public Map<LocalResourceRequest, Boolean> getResourcesUploadPolicies() {
return resourcesUploadPolicies;
}
public Map<LocalResourceRequest, List<String>> getPendingResources() {
public Map<LocalResourceRequest, Set<String>> getPendingResources() {
return pendingResources;
}
public static ResourceSet merge(ResourceSet... resourceSets) {
ResourceSet merged = new ResourceSet();
for (ResourceSet rs : resourceSets) {
// This should overwrite existing symlinks
merged.localizedResources.putAll(rs.localizedResources);
merged.resourcesToBeUploaded.putAll(rs.resourcesToBeUploaded);
merged.resourcesUploadPolicies.putAll(rs.resourcesUploadPolicies);
// TODO : START : Should we de-dup here ?
merged.publicRsrcs.addAll(rs.publicRsrcs);
merged.privateRsrcs.addAll(rs.privateRsrcs);
merged.appRsrcs.addAll(rs.appRsrcs);
// TODO : END
}
return merged;
}
}

View File

@ -39,8 +39,8 @@ public class ContainerLocalizationRequestEvent extends
/**
* Event requesting the localization of the rsrc.
* @param c
* @param rsrc
* @param c Container
* @param rsrc LocalResourceRequests map
*/
public ContainerLocalizationRequestEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {

View File

@ -269,6 +269,42 @@ public void testForcefulShutdownSignal() throws IOException,
super.testForcefulShutdownSignal();
}
@Override
public void testContainerUpgradeSuccess() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccess");
super.testContainerUpgradeSuccess();
}
@Override
public void testContainerUpgradeLocalizationFailure() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeLocalizationFailure");
super.testContainerUpgradeLocalizationFailure();
}
@Override
public void testContainerUpgradeProcessFailure() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeProcessFailure");
super.testContainerUpgradeProcessFailure();
}
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

View File

@ -123,7 +123,11 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException {
conf) {
public int getHttpPort() {
return HTTP_PORT;
};
}
@Override
public ContainerExecutor getContainerExecutor() {
return exec;
}
};
protected ContainerExecutor exec;
protected DeletionService delSrvc;

View File

@ -25,6 +25,7 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
@ -33,6 +34,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -64,6 +66,8 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
@ -94,7 +98,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -366,6 +369,237 @@ public void testContainerLaunchAndStop() throws IOException,
DefaultContainerExecutor.containerIsAlive(pid));
}
@Test
public void testContainerUpgradeSuccess() throws IOException,
InterruptedException, YarnException {
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
String pid = prepareInitialContainer(cId, oldStartFile);
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(false, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
BufferedReader reader =
new BufferedReader(new FileReader(newStartFile));
Assert.assertEquals("Upgrade World!", reader.readLine());
// Get the pid of the process
String newPid = reader.readLine().trim();
Assert.assertNotEquals("Old and New Pids must be different !", pid, newPid);
// No more lines
Assert.assertEquals(null, reader.readLine());
reader.close();
// Verify old file still exists and is accessible by
// the new process...
reader = new BufferedReader(new FileReader(oldStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Assert that the New process is alive
Assert.assertTrue("New Process is not alive!",
DefaultContainerExecutor.containerIsAlive(newPid));
}
@Test
public void testContainerUpgradeLocalizationFailure() throws IOException,
InterruptedException, YarnException {
if (Shell.WINDOWS) {
return;
}
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
String pid = prepareInitialContainer(cId, oldStartFile);
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, true, cId, newStartFile);
// Assert that the First process is STILL alive
// since upgrade was terminated..
Assert.assertTrue("Process is NOT alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
@Test
public void testContainerUpgradeProcessFailure() throws IOException,
InterruptedException, YarnException {
if (Shell.WINDOWS) {
return;
}
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
String pid = prepareInitialContainer(cId, oldStartFile);
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
/**
* Prepare a launch Context for container upgrade and request the
* Container Manager to re-initialize a running container using the
* new launch context.
* @param failCmd injects a start script that intentionally fails.
* @param failLoc injects a bad file Location that will fail localization.
*/
private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
ContainerId cId, File startFile)
throws FileNotFoundException, YarnException, InterruptedException {
// Re-write scriptfile and processStartFile
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
PrintWriter fileWriter = new PrintWriter(scriptFile);
writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc);
containerManager.upgradeContainer(cId, containerLaunchContext);
try {
containerManager.upgradeContainer(cId, containerLaunchContext);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE"));
}
int timeoutSecs = 0;
int maxTimeToWait = failLoc ? 10 : 20;
// Wait for new processStartfile to be created
while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) {
Thread.sleep(1000);
LOG.info("Waiting for New process start-file to be created");
}
}
/**
* Prepare and start an initial container. This container will be subsequently
* re-initialized for upgrade. It also waits for the container to start and
* returns the Pid of the running container.
*/
private String prepareInitialContainer(ContainerId cId, File startFile)
throws IOException, YarnException, InterruptedException {
File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriterOld = new PrintWriter(scriptFileOld);
writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFileOld, "dest_file", false);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
createContainerToken(cId,
DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
int timeoutSecs = 0;
while (!startFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for process start-file to be created");
}
Assert.assertTrue("ProcessStartFile doesn't exist!",
startFile.exists());
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(startFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String pid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
// Assert that the process is alive
Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid));
// Once more
Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid));
return pid;
}
private void writeScriptFile(PrintWriter fileWriter, String startLine,
File processStartFile, ContainerId cId, boolean isFailure) {
if (Shell.WINDOWS) {
fileWriter.println("@echo " + startLine + "> " + processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0"); // So that start file is readable by test
if (isFailure) {
// Echo PID and throw some error code
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexit 111");
} else {
fileWriter.write("\necho " + startLine + " > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100");
}
}
fileWriter.close();
}
private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
String destFName, boolean putBadFile) {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resourceAlpha = null;
if (putBadFile) {
File fileToDelete = new File(tmpDir, "fileToDelete")
.getAbsoluteFile();
resourceAlpha =
URL.fromPath(localFS
.makeQualified(new Path(fileToDelete.getAbsolutePath())));
fileToDelete.delete();
} else {
resourceAlpha =
URL.fromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
}
LocalResource rsrcAlpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrcAlpha.setResource(resourceAlpha);
rsrcAlpha.setSize(-1);
rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrcAlpha.setType(LocalResourceType.FILE);
rsrcAlpha.setTimestamp(scriptFile.lastModified());
Map<String, LocalResource> localResources = new HashMap<>();
localResources.put(destFName, rsrcAlpha);
containerLaunchContext.setLocalResources(localResources);
ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0);
containerLaunchContext.setContainerRetryContext(containerRetryContext);
List<String> commands = Arrays.asList(
Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
return containerLaunchContext;
}
protected void testContainerLaunchAndExit(int exitCode) throws IOException,
InterruptedException, YarnException {
@ -556,7 +790,7 @@ public Boolean get() {
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().contains("Not able to localize new resources"));
e.getMessage().contains("Cannot perform LOCALIZE"));
}
}

View File

@ -190,4 +190,19 @@ public Priority getPriority() {
public void setIpAndHost(String[] ipAndHost) {
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void setIsReInitializing(boolean isReInitializing) {
}
@Override
public boolean isReInitializing() {
return false;
}
}