diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 818b0ea4557..918c30a7a34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -181,6 +181,15 @@ public abstract class ContainerExecutor implements Configurable { public abstract void deleteAsUser(DeletionAsUserContext ctx) throws IOException, InterruptedException; + /** + * Create a symlink file which points to the target. + * @param target The target for symlink + * @param symlink the symlink file + * @throws IOException Error when creating symlinks + */ + public abstract void symLink(String target, String symlink) + throws IOException; + /** * Check if a container is alive. * @param ctx Encapsulates information necessary for container liveness check. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 8ef58993df9..a54df0ed28e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -111,4 +111,6 @@ public interface Context { boolean isDistributedSchedulingEnabled(); OpportunisticContainerAllocator getContainerAllocator(); + + ContainerExecutor getContainerExecutor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 13ad9ac6ce8..9a0549d692b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; @@ -511,6 +512,11 @@ public class DefaultContainerExecutor extends ContainerExecutor { } } + @Override + public void symLink(String target, String symlink) throws IOException { + FileUtil.symLink(target, symlink); + } + /** Permissions for user dir. * $local.dir/usercache/$user */ static final short USER_PERM = (short)0750; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java index ebf9566fb3d..13902145125 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -490,6 +490,12 @@ public class DockerContainerExecutor extends ContainerExecutor { } } + @Override + public void symLink(String target, String symlink) + throws IOException { + + } + /** * Converts a directory list to a docker mount string * @param dirs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 6890b256e52..cc12b2012c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -682,6 +682,11 @@ public class LinuxContainerExecutor extends ContainerExecutor { return files.toArray(new File[files.size()]); } + @Override + public void symLink(String target, String symlink) { + + } + @Override public boolean isContainerAlive(ContainerLivenessContext ctx) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 84c6eebc1db..69765b826aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -334,6 +334,9 @@ public class NodeManager extends CompositeService this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore, isDistSchedulingEnabled); + + ((NMContext)context).setContainerExecutor(exec); + nodeLabelsProvider = createNodeLabelsProvider(conf); if (null == nodeLabelsProvider) { @@ -490,6 +493,7 @@ public class NodeManager extends CompositeService private OpportunisticContainerAllocator containerAllocator; private final QueuingContext queuingContext; + private ContainerExecutor executor; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -646,6 +650,15 @@ public class NodeManager extends CompositeService public OpportunisticContainerAllocator getContainerAllocator() { return containerAllocator; } + + @Override + public ContainerExecutor getContainerExecutor() { + return this.executor; + } + + public void setContainerExecutor(ContainerExecutor executor) { + this.executor = executor; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index fe92656a99c..386079c8d60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -18,25 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; -import static org.apache.hadoop.service.Service.STATE.STARTED; - -import java.io.DataInputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -76,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -128,7 +112,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; @@ -149,8 +136,25 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import static org.apache.hadoop.service.Service.STATE.STARTED; import org.apache.hadoop.yarn.util.resource.Resources; @@ -1444,6 +1448,31 @@ public class ContainerManagerImpl extends CompositeService implements public ResourceLocalizationResponse localize( ResourceLocalizationRequest request) throws YarnException, IOException { + ContainerId containerId = request.getContainerId(); + Container container = context.getContainers().get(containerId); + if (container == null) { + throw new YarnException("Specified " + containerId + " does not exist!"); + } + if (!container.getContainerState() + .equals(org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING)) { + throw new YarnException( + containerId + " is at " + container.getContainerState() + + " state. Not able to localize new resources."); + } + + try { + Map> req = + container.getResourceSet().addResources(request.getLocalResources()); + if (req != null && !req.isEmpty()) { + dispatcher.getEventHandler() + .handle(new ContainerLocalizationRequestEvent(container, req)); + } + } catch (URISyntaxException e) { + LOG.info("Error when parsing local resource URI for " + containerId, e); + throw new YarnException(e); + } + return ResourceLocalizationResponse.newInstance(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index ef0683ed78e..79eeaf150b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -18,9 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -30,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; + +import java.util.List; +import java.util.Map; public interface Container extends EventHandler { @@ -71,4 +72,5 @@ public interface Container extends EventHandler { String toString(); + ResourceSet getResourceSet(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 073dd59d6d4..7f6d5b6023f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -18,18 +18,15 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -63,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; @@ -123,20 +121,7 @@ public class ContainerImpl implements Container { private final Configuration daemonConf; private static final Log LOG = LogFactory.getLog(ContainerImpl.class); - private final Map> pendingResources = - new HashMap>(); - private final Map> localizedResources = - new HashMap>(); - private final List publicRsrcs = - new ArrayList(); - private final List privateRsrcs = - new ArrayList(); - private final List appRsrcs = - new ArrayList(); - private final Map resourcesToBeUploaded = - new ConcurrentHashMap(); - private final Map resourcesUploadPolicies = - new ConcurrentHashMap(); + // whether container has been recovered after a restart private RecoveredContainerStatus recoveredStatus = @@ -144,6 +129,7 @@ public class ContainerImpl implements Container { // whether container was marked as killed after recovery private boolean recoveredAsKilled = false; private Context context; + private ResourceSet resourceSet; public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -202,6 +188,7 @@ public class ContainerImpl implements Container { } stateMachine = stateMachineFactory.make(this); + this.resourceSet = new ResourceSet(); } // constructor for a recovered container @@ -310,6 +297,12 @@ public class ContainerImpl implements Container { ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.RESOURCE_LOCALIZED, + new ResourceLocalizedWhileRunningTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.RESOURCE_FAILED, + new ResourceLocalizationFailedWhileRunningTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -464,7 +457,7 @@ public class ContainerImpl implements Container { try { if (ContainerState.LOCALIZED == getContainerState() || ContainerState.RELAUNCHING == getContainerState()) { - return localizedResources; + return resourceSet.getLocalizedResources(); } else { return null; } @@ -585,6 +578,11 @@ public class ContainerImpl implements Container { this.logDir = logDir; } + @Override + public ResourceSet getResourceSet() { + return this.resourceSet; + } + @SuppressWarnings("unchecked") private void sendFinishedEvents() { // Inform the application @@ -644,7 +642,7 @@ public class ContainerImpl implements Container { for (String s : diags) { this.diagnostics.append(s); } - if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) { + if (diagnostics.length() > diagnosticsMaxSize) { diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize); } try { @@ -658,17 +656,7 @@ public class ContainerImpl implements Container { @SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map> rsrc = - new HashMap>(); - if (!publicRsrcs.isEmpty()) { - rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs); - } - if (!privateRsrcs.isEmpty()) { - rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs); - } - if (!appRsrcs.isEmpty()) { - rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs); - } + resourceSet.getAllResourcesByVisibility(); dispatcher.getEventHandler().handle( new ContainerLocalizationCleanupEvent(this, rsrc)); } @@ -688,7 +676,7 @@ public class ContainerImpl implements Container { * message. * * If there are resources to localize, sends a - * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES) + * ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES) * to the ResourceLocalizationManager and enters LOCALIZING state. * * If there are no resources to localize, sends LAUNCH_CONTAINER event @@ -740,39 +728,15 @@ public class ContainerImpl implements Container { } container.containerLocalizationStartTime = clock.getTime(); + // Send requests for public, private resources Map cntrRsrc = ctxt.getLocalResources(); if (!cntrRsrc.isEmpty()) { try { - for (Map.Entry rsrc : cntrRsrc.entrySet()) { - try { - LocalResourceRequest req = - new LocalResourceRequest(rsrc.getValue()); - List links = container.pendingResources.get(req); - if (links == null) { - links = new ArrayList(); - container.pendingResources.put(req, links); - } - links.add(rsrc.getKey()); - storeSharedCacheUploadPolicy(container, req, rsrc.getValue() - .getShouldBeUploadedToSharedCache()); - switch (rsrc.getValue().getVisibility()) { - case PUBLIC: - container.publicRsrcs.add(req); - break; - case PRIVATE: - container.privateRsrcs.add(req); - break; - case APPLICATION: - container.appRsrcs.add(req); - break; - } - } catch (URISyntaxException e) { - LOG.info("Got exception parsing " + rsrc.getKey() - + " and value " + rsrc.getValue()); - throw e; - } - } + Map> req = + container.resourceSet.addResources(ctxt.getLocalResources()); + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container, req)); } catch (URISyntaxException e) { // malformed resource; abort container launch LOG.warn("Failed to parse resource-request", e); @@ -780,21 +744,6 @@ public class ContainerImpl implements Container { container.metrics.endInitingContainer(); return ContainerState.LOCALIZATION_FAILED; } - Map> req = - new LinkedHashMap>(); - if (!container.publicRsrcs.isEmpty()) { - req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs); - } - if (!container.privateRsrcs.isEmpty()) { - req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs); - } - if (!container.appRsrcs.isEmpty()) { - req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs); - } - - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { container.sendLaunchEvent(); @@ -804,27 +753,6 @@ public class ContainerImpl implements Container { } } - /** - * Store the resource's shared cache upload policies - * Given LocalResourceRequest can be shared across containers in - * LocalResourcesTrackerImpl, we preserve the upload policies here. - * In addition, it is possible for the application to create several - * "identical" LocalResources as part of - * ContainerLaunchContext.setLocalResources with different symlinks. - * There is a corner case where these "identical" local resources have - * different upload policies. For that scenario, upload policy will be set to - * true as long as there is at least one LocalResource entry with - * upload policy set to true. - */ - private static void storeSharedCacheUploadPolicy(ContainerImpl container, - LocalResourceRequest resourceRequest, Boolean uploadPolicy) { - Boolean storedUploadPolicy = - container.resourcesUploadPolicies.get(resourceRequest); - if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) { - container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy); - } - } - /** * Transition when one of the requested resources for this container * has been successfully localized. @@ -838,22 +766,21 @@ public class ContainerImpl implements Container { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; LocalResourceRequest resourceRequest = rsrcEvent.getResource(); Path location = rsrcEvent.getLocation(); - List syms = container.pendingResources.remove(resourceRequest); + List syms = + container.resourceSet.resourceLocalized(resourceRequest, location); if (null == syms) { - LOG.warn("Localized unknown resource " + resourceRequest + - " for container " + container.containerId); - assert false; - // fail container? + LOG.info("Localized resource " + resourceRequest + + " for container " + container.containerId); return ContainerState.LOCALIZING; } - container.localizedResources.put(location, syms); // check to see if this resource should be uploaded to the shared cache // as well if (shouldBeUploadedToSharedCache(container, resourceRequest)) { - container.resourcesToBeUploaded.put(resourceRequest, location); + container.resourceSet.getResourcesToBeUploaded() + .put(resourceRequest, location); } - if (!container.pendingResources.isEmpty()) { + if (!container.resourceSet.getPendingResources().isEmpty()) { return ContainerState.LOCALIZING; } @@ -875,7 +802,8 @@ public class ContainerImpl implements Container { && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) { // kick off uploads to the shared cache container.dispatcher.getEventHandler().handle( - new SharedCacheUploadEvent(container.resourcesToBeUploaded, container + new SharedCacheUploadEvent( + container.resourceSet.getResourcesToBeUploaded(), container .getLaunchContext(), container.getUser(), SharedCacheUploadEventType.UPLOAD)); } @@ -884,6 +812,56 @@ public class ContainerImpl implements Container { } } + /** + * Resource is localized while the container is running - create symlinks + */ + static class ResourceLocalizedWhileRunningTransition + extends ContainerTransition { + + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerResourceLocalizedEvent rsrcEvent = + (ContainerResourceLocalizedEvent) event; + List links = container.resourceSet + .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation()); + // creating symlinks. + for (String link : links) { + try { + String linkFile = new Path(container.workDir, link).toString(); + if (new File(linkFile).exists()) { + LOG.info("Symlink file already exists: " + linkFile); + } else { + container.context.getContainerExecutor() + .symLink(rsrcEvent.getLocation().toString(), linkFile); + LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent + .getLocation()); + } + } catch (IOException e) { + String message = String + .format("Error when creating symlink %s -> %s", link, + rsrcEvent.getLocation()); + LOG.error(message, e); + } + } + } + } + + /** + * Resource localization failed while the container is running. + */ + static class ResourceLocalizationFailedWhileRunningTransition + extends ContainerTransition { + + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerResourceFailedEvent failedEvent = + (ContainerResourceFailedEvent) event; + container.resourceSet + .resourceLocalizationFailed(failedEvent.getResource()); + container.addDiagnostics(failedEvent.getDiagnosticMessage()); + } + } + /** * Transition from LOCALIZED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event @@ -1127,17 +1105,10 @@ public class ContainerImpl implements Container { SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { - ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List syms = - container.pendingResources.remove(rsrcEvent.getResource()); - if (null == syms) { - LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + - " for container " + container.containerId); - assert false; - // fail container? - return; - } - container.localizedResources.put(rsrcEvent.getLocation(), syms); + ContainerResourceLocalizedEvent rsrcEvent = + (ContainerResourceLocalizedEvent) event; + container.resourceSet + .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation()); } } @@ -1392,7 +1363,7 @@ public class ContainerImpl implements Container { */ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container, LocalResourceRequest resource) { - return container.resourcesUploadPolicies.get(resource); + return container.resourceSet.getResourcesUploadPolicies().get(resource); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 14190fc98dc..d8239ef25f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1201,16 +1201,16 @@ public class ContainerLaunch implements Callable { private void recordContainerLogDir(ContainerId containerId, String logDir) throws IOException{ + container.setLogDir(logDir); if (container.isRetryContextSet()) { - container.setLogDir(logDir); context.getNMStateStore().storeContainerLogDir(containerId, logDir); } } private void recordContainerWorkDir(ContainerId containerId, String workDir) throws IOException{ + container.setWorkDir(workDir); if (container.isRetryContextSet()) { - container.setWorkDir(workDir); context.getNMStateStore().storeContainerWorkDir(containerId, workDir); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 0ec2f2938c2..b281ef53258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; @@ -135,7 +136,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.annotations.VisibleForTesting; @@ -419,7 +419,7 @@ public class ResourceLocalizationService extends CompositeService handleInitApplicationResources( ((ApplicationLocalizationEvent)event).getApplication()); break; - case INIT_CONTAINER_RESOURCES: + case LOCALIZE_CONTAINER_RESOURCES: handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; case CONTAINER_RESOURCES_LOCALIZED: @@ -469,6 +469,13 @@ public class ResourceLocalizationService extends CompositeService private void handleInitContainerResources( ContainerLocalizationRequestEvent rsrcReqs) { Container c = rsrcReqs.getContainer(); + EnumSet set = + EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING); + if (!set.contains(c.getContainerState())) { + LOG.warn(c.getContainerId() + " is at " + c.getContainerState() + + " state, do not localize resources."); + return; + } // create a loading cache for the file statuses LoadingCache> statCache = CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig())); @@ -538,7 +545,7 @@ public class ResourceLocalizationService extends CompositeService } String locId = c.getContainerId().toString(); localizerTracker.cleanupPrivLocalizers(locId); - + // Delete the container directories String userName = c.getUser(); String containerIDStr = c.toString(); @@ -747,6 +754,14 @@ public class ResourceLocalizationService extends CompositeService case APPLICATION: synchronized (privLocalizers) { LocalizerRunner localizer = privLocalizers.get(locId); + if (localizer != null && localizer.killContainerLocalizer.get()) { + // Old localizer thread has been stopped, remove it and creates + // a new localizer thread. + LOG.info("New " + event.getType() + " localize request for " + + locId + ", remove old private localizer."); + cleanupPrivLocalizers(locId); + localizer = null; + } if (null == localizer) { LOG.info("Created localizer for " + locId); localizer = new LocalizerRunner(req.getContext(), locId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java new file mode 100644 index 00000000000..557b5279f4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All Resources requested by the container. + */ +public class ResourceSet { + + private static final Log LOG = LogFactory.getLog(ResourceSet.class); + + // resources by localization state (localized, pending, failed) + private Map> localizedResources = + new ConcurrentHashMap<>(); + private Map> pendingResources = + new ConcurrentHashMap<>(); + private Set resourcesFailedToBeLocalized = + new HashSet<>(); + + // resources by visibility (public, private, app) + private final List publicRsrcs = + new ArrayList<>(); + private final List privateRsrcs = + new ArrayList<>(); + private final List appRsrcs = + new ArrayList<>(); + + private final Map resourcesToBeUploaded = + new ConcurrentHashMap<>(); + private final Map resourcesUploadPolicies = + new ConcurrentHashMap<>(); + + public Map> + addResources(Map localResourceMap) + throws URISyntaxException { + if (localResourceMap == null || localResourceMap.isEmpty()) { + return null; + } + Map> allResources = new HashMap<>(); + List publicList = new ArrayList<>(); + List privateList = new ArrayList<>(); + List appList = new ArrayList<>(); + + for (Map.Entry rsrc : localResourceMap.entrySet()) { + LocalResource resource = rsrc.getValue(); + LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); + if (!allResources.containsKey(req)) { + allResources.put(req, new ArrayList()); + } + allResources.get(req).add(rsrc.getKey()); + storeSharedCacheUploadPolicy(req, + resource.getShouldBeUploadedToSharedCache()); + switch (resource.getVisibility()) { + case PUBLIC: + publicList.add(req); + break; + case PRIVATE: + privateList.add(req); + break; + case APPLICATION: + appList.add(req); + break; + default: + break; + } + } + Map> req = + new LinkedHashMap<>(); + if (!publicList.isEmpty()) { + publicRsrcs.addAll(publicList); + req.put(LocalResourceVisibility.PUBLIC, publicList); + } + if (!privateList.isEmpty()) { + privateRsrcs.addAll(privateList); + req.put(LocalResourceVisibility.PRIVATE, privateList); + } + if (!appList.isEmpty()) { + appRsrcs.addAll(appList); + req.put(LocalResourceVisibility.APPLICATION, appList); + } + if (!allResources.isEmpty()) { + this.pendingResources.putAll(allResources); + } + return req; + } + + /** + * Called when resource localized. + * @param request The original request for the localized resource + * @param location The path where the resource is localized + * @return The list of symlinks for the localized resources. + */ + public List resourceLocalized(LocalResourceRequest request, + Path location) { + List symlinks = pendingResources.remove(request); + if (symlinks == null) { + return null; + } else { + localizedResources.put(location, symlinks); + return symlinks; + } + } + + public void resourceLocalizationFailed(LocalResourceRequest request) { + pendingResources.remove(request); + resourcesFailedToBeLocalized.add(request); + } + + public synchronized Map> getAllResourcesByVisibility() { + + Map> rsrc = + new HashMap<>(); + if (!publicRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs); + } + if (!privateRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs); + } + if (!appRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs); + } + return rsrc; + } + + /** + * Store the resource's shared cache upload policies + * Given LocalResourceRequest can be shared across containers in + * LocalResourcesTrackerImpl, we preserve the upload policies here. + * In addition, it is possible for the application to create several + * "identical" LocalResources as part of + * ContainerLaunchContext.setLocalResources with different symlinks. + * There is a corner case where these "identical" local resources have + * different upload policies. For that scenario, upload policy will be set to + * true as long as there is at least one LocalResource entry with + * upload policy set to true. + */ + private void storeSharedCacheUploadPolicy( + LocalResourceRequest resourceRequest, Boolean uploadPolicy) { + Boolean storedUploadPolicy = resourcesUploadPolicies.get(resourceRequest); + if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) { + resourcesUploadPolicies.put(resourceRequest, uploadPolicy); + } + } + + public Map> getLocalizedResources() { + return localizedResources; + } + + public Map getResourcesToBeUploaded() { + return resourcesToBeUploaded; + } + + public Map getResourcesUploadPolicies() { + return resourcesUploadPolicies; + } + + public Map> getPendingResources() { + return pendingResources; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java index 11bb25e943d..43a2f33fec3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java @@ -44,7 +44,7 @@ public class ContainerLocalizationRequestEvent extends */ public ContainerLocalizationRequestEvent(Container c, Map> rsrc) { - super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c); + super(LocalizationEventType.LOCALIZE_CONTAINER_RESOURCES, c); this.rsrc = rsrc; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 4785fba4229..cb9842d2b80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve public enum LocalizationEventType { INIT_APPLICATION_RESOURCES, - INIT_CONTAINER_RESOURCES, + LOCALIZE_CONTAINER_RESOURCES, CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 4aeb2e2802d..740ed19dc3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -86,7 +86,7 @@ public class DummyContainerManager extends ContainerManagerImpl { dispatcher.getEventHandler().handle(new ApplicationInitedEvent( app.getAppId())); break; - case INIT_CONTAINER_RESOURCES: + case LOCALIZE_CONTAINER_RESOURCES: ContainerLocalizationRequestEvent rsrcReqs = (ContainerLocalizationRequestEvent) event; // simulate localization of all requested resources diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 3e008854d0b..aa0d975ea37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -131,7 +131,18 @@ public class TestContainerManagerWithLCE extends TestContainerManager { LOG.info("Running testContainerLaunchAndExitFailure"); super.testContainerLaunchAndExitFailure(); } - + + @Override + public void testLocalingResourceWhileContainerRunning() + throws Exception { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + super.testLocalingResourceWhileContainerRunning(); + } + @Override public void testLocalFilesCleanup() throws InterruptedException, IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index a41f86517ca..4259e82059b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; @@ -151,7 +152,7 @@ public abstract class BaseAMRMProxyTest { * rest. So the responses returned can be less than the number of end points * specified * - * @param testContext + * @param testContexts * @param func * @return */ @@ -698,5 +699,10 @@ public abstract class BaseAMRMProxyTest { public OpportunisticContainerAllocator getContainerAllocator() { return null; } + + @Override + public ContainerExecutor getContainerExecutor() { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 1f803b4e672..d116d30e47c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -36,17 +36,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Supplier; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -459,7 +463,138 @@ public class TestContainerManager extends BaseContainerManagerTest { // and verify exit code returned testContainerLaunchAndExit(exitCode); } - + + private Map setupLocalResources(String fileName, + String symLink) throws Exception { + // ////// Create the resources for the container + File dir = new File(tmpDir, "dir"); + dir.mkdirs(); + File file = new File(dir, fileName); + PrintWriter fileWriter = new PrintWriter(file); + fileWriter.write("Hello World!"); + fileWriter.close(); + + URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext() + .makeQualified(new Path(file.getAbsolutePath()))); + LocalResource resource = + recordFactory.newRecordInstance(LocalResource.class); + resource.setResource(resourceURL); + resource.setSize(-1); + resource.setVisibility(LocalResourceVisibility.APPLICATION); + resource.setType(LocalResourceType.FILE); + resource.setTimestamp(file.lastModified()); + Map localResources = + new HashMap(); + localResources.put(symLink, resource); + return localResources; + } + + // Start the container + // While the container is running, localize new resources. + // Verify the symlink is created properly + @Test + public void testLocalingResourceWhileContainerRunning() throws Exception { + // Real del service + delSrvc = new DeletionService(exec); + delSrvc.init(conf); + + ((NodeManager.NMContext)context).setContainerExecutor(exec); + containerManager = createContainerManager(delSrvc); + containerManager.init(conf); + containerManager.start(); + // set up local resources + Map localResource = + setupLocalResources("file", "symLink1"); + ContainerLaunchContext context = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + context.setLocalResources(localResource); + + // a long running container - sleep + context.setCommands(Arrays.asList("sleep 6")); + final ContainerId cId = createContainerId(0); + + // start the container + StartContainerRequest scRequest = StartContainerRequest.newInstance(context, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(), + user, this.context.getContainerTokenSecretManager())); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(Arrays.asList(scRequest)); + containerManager.startContainers(allRequests); + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.RUNNING); + + BaseContainerManagerTest.waitForApplicationState(containerManager, + cId.getApplicationAttemptId().getApplicationId(), + ApplicationState.RUNNING); + checkResourceLocalized(cId, "symLink1"); + + // Localize new local resources while container is running + Map localResource2 = + setupLocalResources("file2", "symLink2"); + + ResourceLocalizationRequest request = + ResourceLocalizationRequest.newInstance(cId, localResource2); + containerManager.localize(request); + + // Verify resource is localized and symlink is created. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + checkResourceLocalized(cId, "symLink2"); + return true; + } catch (Throwable e) { + return false; + } + } + }, 500, 20000); + + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.COMPLETE); + // Verify container cannot localize resources while at non-running state. + try{ + containerManager.localize(request); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().contains("Not able to localize new resources")); + } + } + + private void checkResourceLocalized(ContainerId containerId, String symLink) { + String appId = + containerId.getApplicationAttemptId().getApplicationId().toString(); + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + // localDir/usercache/nobody/appcache/application_0_0000 + File appDir = new File(appCache, appId); + // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000 + File containerDir = new File(appDir, containerId.toString()); + // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1 + File targetFile = new File(containerDir, symLink); + + File sysDir = + new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR); + // localDir/nmPrivate/application_0_0000 + File appSysDir = new File(sysDir, appId); + // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000 + File containerSysDir = new File(appSysDir, containerId.toString()); + + Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!", + appDir.exists()); + Assert.assertTrue( + "AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!", + appSysDir.exists()); + Assert.assertTrue( + "containerDir " + containerDir.getAbsolutePath() + " doesn't exist !", + containerDir.exists()); + Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath() + + " doesn't exist !", containerDir.exists()); + Assert.assertTrue( + "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!", + targetFile.exists()); + } + @Test public void testLocalFilesCleanup() throws InterruptedException, IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index f594d8cf363..5e2f9fb5a69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -66,6 +66,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.junit.Assert; @@ -1766,7 +1767,7 @@ public class TestResourceLocalizationService { // creating new containers and populating corresponding localizer runners // Container - 1 - ContainerImpl container1 = createMockContainer(user, 1); + Container container1 = createMockContainer(user, 1); String localizerId1 = container1.getContainerId().toString(); rls.getPrivateLocalizers().put( localizerId1, @@ -2291,7 +2292,7 @@ public class TestResourceLocalizationService { } private ContainerLocalizationRequestEvent createContainerLocalizationEvent( - ContainerImpl container, LocalResourceVisibility vis, + Container container, LocalResourceVisibility vis, LocalResourceRequest req) { Map> reqs = new HashMap>(); @@ -2309,6 +2310,7 @@ public class TestResourceLocalizationService { when(container.getUser()).thenReturn(user); Credentials mockCredentials = mock(Credentials.class); when(container.getCredentials()).thenReturn(mockCredentials); + when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING); return container; } @@ -2357,6 +2359,7 @@ public class TestResourceLocalizationService { creds.addToken(new Text("tok" + id), tk); when(c.getCredentials()).thenReturn(creds); when(c.toString()).thenReturn(cId.toString()); + when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING); return c; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index 2df0c98482b..d24f89db0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -89,6 +89,13 @@ public class TestContainersMonitorResourceChange { public void deleteAsUser(DeletionAsUserContext ctx) throws IOException, InterruptedException { } + + @Override + public void symLink(String target, String symlink) + throws IOException { + + } + @Override public String getProcessId(ContainerId containerId) { return String.valueOf(containerId.getContainerId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index cada9da46fa..7a54263832c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,11 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; @@ -40,8 +36,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class MockContainer implements Container { private ContainerId id; @@ -117,6 +119,11 @@ public class MockContainer implements Container { return ""; } + @Override + public ResourceSet getResourceSet() { + return null; + } + @Override public void handle(ContainerEvent event) { }