YARN-5576. Allow resource localization while container is running. Contributed by Jian He.
This commit is contained in:
parent
de9f046598
commit
ec3a651b17
|
@ -181,6 +181,15 @@ public abstract class ContainerExecutor implements Configurable {
|
|||
public abstract void deleteAsUser(DeletionAsUserContext ctx)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Create a symlink file which points to the target.
|
||||
* @param target The target for symlink
|
||||
* @param symlink the symlink file
|
||||
* @throws IOException Error when creating symlinks
|
||||
*/
|
||||
public abstract void symLink(String target, String symlink)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Check if a container is alive.
|
||||
* @param ctx Encapsulates information necessary for container liveness check.
|
||||
|
|
|
@ -111,4 +111,6 @@ public interface Context {
|
|||
boolean isDistributedSchedulingEnabled();
|
||||
|
||||
OpportunisticContainerAllocator getContainerAllocator();
|
||||
|
||||
ContainerExecutor getContainerExecutor();
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -511,6 +512,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void symLink(String target, String symlink) throws IOException {
|
||||
FileUtil.symLink(target, symlink);
|
||||
}
|
||||
|
||||
/** Permissions for user dir.
|
||||
* $local.dir/usercache/$user */
|
||||
static final short USER_PERM = (short)0750;
|
||||
|
|
|
@ -490,6 +490,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void symLink(String target, String symlink)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a directory list to a docker mount string
|
||||
* @param dirs
|
||||
|
|
|
@ -682,6 +682,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|||
return files.toArray(new File[files.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void symLink(String target, String symlink) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isContainerAlive(ContainerLivenessContext ctx)
|
||||
throws IOException {
|
||||
|
|
|
@ -334,6 +334,9 @@ public class NodeManager extends CompositeService
|
|||
this.context = createNMContext(containerTokenSecretManager,
|
||||
nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
|
||||
|
||||
|
||||
((NMContext)context).setContainerExecutor(exec);
|
||||
|
||||
nodeLabelsProvider = createNodeLabelsProvider(conf);
|
||||
|
||||
if (null == nodeLabelsProvider) {
|
||||
|
@ -490,6 +493,7 @@ public class NodeManager extends CompositeService
|
|||
private OpportunisticContainerAllocator containerAllocator;
|
||||
|
||||
private final QueuingContext queuingContext;
|
||||
private ContainerExecutor executor;
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
|
@ -646,6 +650,15 @@ public class NodeManager extends CompositeService
|
|||
public OpportunisticContainerAllocator getContainerAllocator() {
|
||||
return containerAllocator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerExecutor getContainerExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
public void setContainerExecutor(ContainerExecutor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,25 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import static org.apache.hadoop.service.Service.STATE.STARTED;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -76,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -128,7 +112,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
|
||||
|
@ -149,8 +136,25 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
|
|||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import static org.apache.hadoop.service.Service.STATE.STARTED;
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -1444,6 +1448,31 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
public ResourceLocalizationResponse localize(
|
||||
ResourceLocalizationRequest request) throws YarnException, IOException {
|
||||
|
||||
ContainerId containerId = request.getContainerId();
|
||||
Container container = context.getContainers().get(containerId);
|
||||
if (container == null) {
|
||||
throw new YarnException("Specified " + containerId + " does not exist!");
|
||||
}
|
||||
if (!container.getContainerState()
|
||||
.equals(org.apache.hadoop.yarn.server.nodemanager.
|
||||
containermanager.container.ContainerState.RUNNING)) {
|
||||
throw new YarnException(
|
||||
containerId + " is at " + container.getContainerState()
|
||||
+ " state. Not able to localize new resources.");
|
||||
}
|
||||
|
||||
try {
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
||||
container.getResourceSet().addResources(request.getLocalResources());
|
||||
if (req != null && !req.isEmpty()) {
|
||||
dispatcher.getEventHandler()
|
||||
.handle(new ContainerLocalizationRequestEvent(container, req));
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.info("Error when parsing local resource URI for " + containerId, e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
|
||||
return ResourceLocalizationResponse.newInstance();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -30,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Container extends EventHandler<ContainerEvent> {
|
||||
|
||||
|
@ -71,4 +72,5 @@ public interface Container extends EventHandler<ContainerEvent> {
|
|||
|
||||
String toString();
|
||||
|
||||
ResourceSet getResourceSet();
|
||||
}
|
||||
|
|
|
@ -18,18 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -63,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||
|
@ -123,20 +121,7 @@ public class ContainerImpl implements Container {
|
|||
private final Configuration daemonConf;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
|
||||
private final Map<LocalResourceRequest,List<String>> pendingResources =
|
||||
new HashMap<LocalResourceRequest,List<String>>();
|
||||
private final Map<Path,List<String>> localizedResources =
|
||||
new HashMap<Path,List<String>>();
|
||||
private final List<LocalResourceRequest> publicRsrcs =
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
private final List<LocalResourceRequest> privateRsrcs =
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
private final List<LocalResourceRequest> appRsrcs =
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
|
||||
new ConcurrentHashMap<LocalResourceRequest, Path>();
|
||||
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
|
||||
new ConcurrentHashMap<LocalResourceRequest, Boolean>();
|
||||
|
||||
|
||||
// whether container has been recovered after a restart
|
||||
private RecoveredContainerStatus recoveredStatus =
|
||||
|
@ -144,6 +129,7 @@ public class ContainerImpl implements Container {
|
|||
// whether container was marked as killed after recovery
|
||||
private boolean recoveredAsKilled = false;
|
||||
private Context context;
|
||||
private ResourceSet resourceSet;
|
||||
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
|
@ -202,6 +188,7 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
this.resourceSet = new ResourceSet();
|
||||
}
|
||||
|
||||
// constructor for a recovered container
|
||||
|
@ -310,6 +297,12 @@ public class ContainerImpl implements Container {
|
|||
ContainerState.EXITED_WITH_FAILURE),
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
new RetryFailureTransition())
|
||||
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||
ContainerEventType.RESOURCE_LOCALIZED,
|
||||
new ResourceLocalizedWhileRunningTransition())
|
||||
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||
ContainerEventType.RESOURCE_FAILED,
|
||||
new ResourceLocalizationFailedWhileRunningTransition())
|
||||
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
|
@ -464,7 +457,7 @@ public class ContainerImpl implements Container {
|
|||
try {
|
||||
if (ContainerState.LOCALIZED == getContainerState()
|
||||
|| ContainerState.RELAUNCHING == getContainerState()) {
|
||||
return localizedResources;
|
||||
return resourceSet.getLocalizedResources();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -585,6 +578,11 @@ public class ContainerImpl implements Container {
|
|||
this.logDir = logDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet getResourceSet() {
|
||||
return this.resourceSet;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void sendFinishedEvents() {
|
||||
// Inform the application
|
||||
|
@ -644,7 +642,7 @@ public class ContainerImpl implements Container {
|
|||
for (String s : diags) {
|
||||
this.diagnostics.append(s);
|
||||
}
|
||||
if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
|
||||
if (diagnostics.length() > diagnosticsMaxSize) {
|
||||
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
|
||||
}
|
||||
try {
|
||||
|
@ -658,17 +656,7 @@ public class ContainerImpl implements Container {
|
|||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
public void cleanup() {
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
|
||||
new HashMap<LocalResourceVisibility,
|
||||
Collection<LocalResourceRequest>>();
|
||||
if (!publicRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
|
||||
}
|
||||
if (!privateRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
|
||||
}
|
||||
if (!appRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
|
||||
}
|
||||
resourceSet.getAllResourcesByVisibility();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerLocalizationCleanupEvent(this, rsrc));
|
||||
}
|
||||
|
@ -688,7 +676,7 @@ public class ContainerImpl implements Container {
|
|||
* message.
|
||||
*
|
||||
* If there are resources to localize, sends a
|
||||
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
|
||||
* ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
|
||||
* to the ResourceLocalizationManager and enters LOCALIZING state.
|
||||
*
|
||||
* If there are no resources to localize, sends LAUNCH_CONTAINER event
|
||||
|
@ -740,39 +728,15 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
|
||||
container.containerLocalizationStartTime = clock.getTime();
|
||||
|
||||
// Send requests for public, private resources
|
||||
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
|
||||
if (!cntrRsrc.isEmpty()) {
|
||||
try {
|
||||
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
|
||||
try {
|
||||
LocalResourceRequest req =
|
||||
new LocalResourceRequest(rsrc.getValue());
|
||||
List<String> links = container.pendingResources.get(req);
|
||||
if (links == null) {
|
||||
links = new ArrayList<String>();
|
||||
container.pendingResources.put(req, links);
|
||||
}
|
||||
links.add(rsrc.getKey());
|
||||
storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
|
||||
.getShouldBeUploadedToSharedCache());
|
||||
switch (rsrc.getValue().getVisibility()) {
|
||||
case PUBLIC:
|
||||
container.publicRsrcs.add(req);
|
||||
break;
|
||||
case PRIVATE:
|
||||
container.privateRsrcs.add(req);
|
||||
break;
|
||||
case APPLICATION:
|
||||
container.appRsrcs.add(req);
|
||||
break;
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.info("Got exception parsing " + rsrc.getKey()
|
||||
+ " and value " + rsrc.getValue());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
||||
container.resourceSet.addResources(ctxt.getLocalResources());
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerLocalizationRequestEvent(container, req));
|
||||
} catch (URISyntaxException e) {
|
||||
// malformed resource; abort container launch
|
||||
LOG.warn("Failed to parse resource-request", e);
|
||||
|
@ -780,21 +744,6 @@ public class ContainerImpl implements Container {
|
|||
container.metrics.endInitingContainer();
|
||||
return ContainerState.LOCALIZATION_FAILED;
|
||||
}
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
||||
new LinkedHashMap<LocalResourceVisibility,
|
||||
Collection<LocalResourceRequest>>();
|
||||
if (!container.publicRsrcs.isEmpty()) {
|
||||
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
|
||||
}
|
||||
if (!container.privateRsrcs.isEmpty()) {
|
||||
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
|
||||
}
|
||||
if (!container.appRsrcs.isEmpty()) {
|
||||
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
|
||||
}
|
||||
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerLocalizationRequestEvent(container, req));
|
||||
return ContainerState.LOCALIZING;
|
||||
} else {
|
||||
container.sendLaunchEvent();
|
||||
|
@ -804,27 +753,6 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the resource's shared cache upload policies
|
||||
* Given LocalResourceRequest can be shared across containers in
|
||||
* LocalResourcesTrackerImpl, we preserve the upload policies here.
|
||||
* In addition, it is possible for the application to create several
|
||||
* "identical" LocalResources as part of
|
||||
* ContainerLaunchContext.setLocalResources with different symlinks.
|
||||
* There is a corner case where these "identical" local resources have
|
||||
* different upload policies. For that scenario, upload policy will be set to
|
||||
* true as long as there is at least one LocalResource entry with
|
||||
* upload policy set to true.
|
||||
*/
|
||||
private static void storeSharedCacheUploadPolicy(ContainerImpl container,
|
||||
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
|
||||
Boolean storedUploadPolicy =
|
||||
container.resourcesUploadPolicies.get(resourceRequest);
|
||||
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
|
||||
container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition when one of the requested resources for this container
|
||||
* has been successfully localized.
|
||||
|
@ -838,22 +766,21 @@ public class ContainerImpl implements Container {
|
|||
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
|
||||
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
|
||||
Path location = rsrcEvent.getLocation();
|
||||
List<String> syms = container.pendingResources.remove(resourceRequest);
|
||||
List<String> syms =
|
||||
container.resourceSet.resourceLocalized(resourceRequest, location);
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + resourceRequest +
|
||||
" for container " + container.containerId);
|
||||
assert false;
|
||||
// fail container?
|
||||
LOG.info("Localized resource " + resourceRequest +
|
||||
" for container " + container.containerId);
|
||||
return ContainerState.LOCALIZING;
|
||||
}
|
||||
container.localizedResources.put(location, syms);
|
||||
|
||||
// check to see if this resource should be uploaded to the shared cache
|
||||
// as well
|
||||
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
|
||||
container.resourcesToBeUploaded.put(resourceRequest, location);
|
||||
container.resourceSet.getResourcesToBeUploaded()
|
||||
.put(resourceRequest, location);
|
||||
}
|
||||
if (!container.pendingResources.isEmpty()) {
|
||||
if (!container.resourceSet.getPendingResources().isEmpty()) {
|
||||
return ContainerState.LOCALIZING;
|
||||
}
|
||||
|
||||
|
@ -875,7 +802,8 @@ public class ContainerImpl implements Container {
|
|||
&& container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
|
||||
// kick off uploads to the shared cache
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
|
||||
new SharedCacheUploadEvent(
|
||||
container.resourceSet.getResourcesToBeUploaded(), container
|
||||
.getLaunchContext(), container.getUser(),
|
||||
SharedCacheUploadEventType.UPLOAD));
|
||||
}
|
||||
|
@ -884,6 +812,56 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource is localized while the container is running - create symlinks
|
||||
*/
|
||||
static class ResourceLocalizedWhileRunningTransition
|
||||
extends ContainerTransition {
|
||||
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
ContainerResourceLocalizedEvent rsrcEvent =
|
||||
(ContainerResourceLocalizedEvent) event;
|
||||
List<String> links = container.resourceSet
|
||||
.resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
|
||||
// creating symlinks.
|
||||
for (String link : links) {
|
||||
try {
|
||||
String linkFile = new Path(container.workDir, link).toString();
|
||||
if (new File(linkFile).exists()) {
|
||||
LOG.info("Symlink file already exists: " + linkFile);
|
||||
} else {
|
||||
container.context.getContainerExecutor()
|
||||
.symLink(rsrcEvent.getLocation().toString(), linkFile);
|
||||
LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
|
||||
.getLocation());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String message = String
|
||||
.format("Error when creating symlink %s -> %s", link,
|
||||
rsrcEvent.getLocation());
|
||||
LOG.error(message, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource localization failed while the container is running.
|
||||
*/
|
||||
static class ResourceLocalizationFailedWhileRunningTransition
|
||||
extends ContainerTransition {
|
||||
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
ContainerResourceFailedEvent failedEvent =
|
||||
(ContainerResourceFailedEvent) event;
|
||||
container.resourceSet
|
||||
.resourceLocalizationFailed(failedEvent.getResource());
|
||||
container.addDiagnostics(failedEvent.getDiagnosticMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from LOCALIZED state to RUNNING state upon receiving
|
||||
* a CONTAINER_LAUNCHED event
|
||||
|
@ -1127,17 +1105,10 @@ public class ContainerImpl implements Container {
|
|||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
|
||||
List<String> syms =
|
||||
container.pendingResources.remove(rsrcEvent.getResource());
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
||||
" for container " + container.containerId);
|
||||
assert false;
|
||||
// fail container?
|
||||
return;
|
||||
}
|
||||
container.localizedResources.put(rsrcEvent.getLocation(), syms);
|
||||
ContainerResourceLocalizedEvent rsrcEvent =
|
||||
(ContainerResourceLocalizedEvent) event;
|
||||
container.resourceSet
|
||||
.resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1392,7 +1363,7 @@ public class ContainerImpl implements Container {
|
|||
*/
|
||||
private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
|
||||
LocalResourceRequest resource) {
|
||||
return container.resourcesUploadPolicies.get(resource);
|
||||
return container.resourceSet.getResourcesUploadPolicies().get(resource);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -1201,16 +1201,16 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
|
||||
private void recordContainerLogDir(ContainerId containerId,
|
||||
String logDir) throws IOException{
|
||||
container.setLogDir(logDir);
|
||||
if (container.isRetryContextSet()) {
|
||||
container.setLogDir(logDir);
|
||||
context.getNMStateStore().storeContainerLogDir(containerId, logDir);
|
||||
}
|
||||
}
|
||||
|
||||
private void recordContainerWorkDir(ContainerId containerId,
|
||||
String workDir) throws IOException{
|
||||
container.setWorkDir(workDir);
|
||||
if (container.isRetryContextSet()) {
|
||||
container.setWorkDir(workDir);
|
||||
context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
|
||||
|
@ -135,7 +136,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.FSDownload;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -419,7 +419,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
handleInitApplicationResources(
|
||||
((ApplicationLocalizationEvent)event).getApplication());
|
||||
break;
|
||||
case INIT_CONTAINER_RESOURCES:
|
||||
case LOCALIZE_CONTAINER_RESOURCES:
|
||||
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
|
||||
break;
|
||||
case CONTAINER_RESOURCES_LOCALIZED:
|
||||
|
@ -469,6 +469,13 @@ public class ResourceLocalizationService extends CompositeService
|
|||
private void handleInitContainerResources(
|
||||
ContainerLocalizationRequestEvent rsrcReqs) {
|
||||
Container c = rsrcReqs.getContainer();
|
||||
EnumSet<ContainerState> set =
|
||||
EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
|
||||
if (!set.contains(c.getContainerState())) {
|
||||
LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
|
||||
+ " state, do not localize resources.");
|
||||
return;
|
||||
}
|
||||
// create a loading cache for the file statuses
|
||||
LoadingCache<Path,Future<FileStatus>> statCache =
|
||||
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
|
||||
|
@ -538,7 +545,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
String locId = c.getContainerId().toString();
|
||||
localizerTracker.cleanupPrivLocalizers(locId);
|
||||
|
||||
|
||||
// Delete the container directories
|
||||
String userName = c.getUser();
|
||||
String containerIDStr = c.toString();
|
||||
|
@ -747,6 +754,14 @@ public class ResourceLocalizationService extends CompositeService
|
|||
case APPLICATION:
|
||||
synchronized (privLocalizers) {
|
||||
LocalizerRunner localizer = privLocalizers.get(locId);
|
||||
if (localizer != null && localizer.killContainerLocalizer.get()) {
|
||||
// Old localizer thread has been stopped, remove it and creates
|
||||
// a new localizer thread.
|
||||
LOG.info("New " + event.getType() + " localize request for "
|
||||
+ locId + ", remove old private localizer.");
|
||||
cleanupPrivLocalizers(locId);
|
||||
localizer = null;
|
||||
}
|
||||
if (null == localizer) {
|
||||
LOG.info("Created localizer for " + locId);
|
||||
localizer = new LocalizerRunner(req.getContext(), locId);
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* All Resources requested by the container.
|
||||
*/
|
||||
public class ResourceSet {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourceSet.class);
|
||||
|
||||
// resources by localization state (localized, pending, failed)
|
||||
private Map<Path, List<String>> localizedResources =
|
||||
new ConcurrentHashMap<>();
|
||||
private Map<LocalResourceRequest, List<String>> pendingResources =
|
||||
new ConcurrentHashMap<>();
|
||||
private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
|
||||
new HashSet<>();
|
||||
|
||||
// resources by visibility (public, private, app)
|
||||
private final List<LocalResourceRequest> publicRsrcs =
|
||||
new ArrayList<>();
|
||||
private final List<LocalResourceRequest> privateRsrcs =
|
||||
new ArrayList<>();
|
||||
private final List<LocalResourceRequest> appRsrcs =
|
||||
new ArrayList<>();
|
||||
|
||||
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
|
||||
new ConcurrentHashMap<>();
|
||||
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
|
||||
addResources(Map<String, LocalResource> localResourceMap)
|
||||
throws URISyntaxException {
|
||||
if (localResourceMap == null || localResourceMap.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
|
||||
List<LocalResourceRequest> publicList = new ArrayList<>();
|
||||
List<LocalResourceRequest> privateList = new ArrayList<>();
|
||||
List<LocalResourceRequest> appList = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
|
||||
LocalResource resource = rsrc.getValue();
|
||||
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
|
||||
if (!allResources.containsKey(req)) {
|
||||
allResources.put(req, new ArrayList<String>());
|
||||
}
|
||||
allResources.get(req).add(rsrc.getKey());
|
||||
storeSharedCacheUploadPolicy(req,
|
||||
resource.getShouldBeUploadedToSharedCache());
|
||||
switch (resource.getVisibility()) {
|
||||
case PUBLIC:
|
||||
publicList.add(req);
|
||||
break;
|
||||
case PRIVATE:
|
||||
privateList.add(req);
|
||||
break;
|
||||
case APPLICATION:
|
||||
appList.add(req);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
||||
new LinkedHashMap<>();
|
||||
if (!publicList.isEmpty()) {
|
||||
publicRsrcs.addAll(publicList);
|
||||
req.put(LocalResourceVisibility.PUBLIC, publicList);
|
||||
}
|
||||
if (!privateList.isEmpty()) {
|
||||
privateRsrcs.addAll(privateList);
|
||||
req.put(LocalResourceVisibility.PRIVATE, privateList);
|
||||
}
|
||||
if (!appList.isEmpty()) {
|
||||
appRsrcs.addAll(appList);
|
||||
req.put(LocalResourceVisibility.APPLICATION, appList);
|
||||
}
|
||||
if (!allResources.isEmpty()) {
|
||||
this.pendingResources.putAll(allResources);
|
||||
}
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resource localized.
|
||||
* @param request The original request for the localized resource
|
||||
* @param location The path where the resource is localized
|
||||
* @return The list of symlinks for the localized resources.
|
||||
*/
|
||||
public List<String> resourceLocalized(LocalResourceRequest request,
|
||||
Path location) {
|
||||
List<String> symlinks = pendingResources.remove(request);
|
||||
if (symlinks == null) {
|
||||
return null;
|
||||
} else {
|
||||
localizedResources.put(location, symlinks);
|
||||
return symlinks;
|
||||
}
|
||||
}
|
||||
|
||||
public void resourceLocalizationFailed(LocalResourceRequest request) {
|
||||
pendingResources.remove(request);
|
||||
resourcesFailedToBeLocalized.add(request);
|
||||
}
|
||||
|
||||
public synchronized Map<LocalResourceVisibility,
|
||||
Collection<LocalResourceRequest>> getAllResourcesByVisibility() {
|
||||
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
|
||||
new HashMap<>();
|
||||
if (!publicRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
|
||||
}
|
||||
if (!privateRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
|
||||
}
|
||||
if (!appRsrcs.isEmpty()) {
|
||||
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
|
||||
}
|
||||
return rsrc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the resource's shared cache upload policies
|
||||
* Given LocalResourceRequest can be shared across containers in
|
||||
* LocalResourcesTrackerImpl, we preserve the upload policies here.
|
||||
* In addition, it is possible for the application to create several
|
||||
* "identical" LocalResources as part of
|
||||
* ContainerLaunchContext.setLocalResources with different symlinks.
|
||||
* There is a corner case where these "identical" local resources have
|
||||
* different upload policies. For that scenario, upload policy will be set to
|
||||
* true as long as there is at least one LocalResource entry with
|
||||
* upload policy set to true.
|
||||
*/
|
||||
private void storeSharedCacheUploadPolicy(
|
||||
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
|
||||
Boolean storedUploadPolicy = resourcesUploadPolicies.get(resourceRequest);
|
||||
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
|
||||
resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Path, List<String>> getLocalizedResources() {
|
||||
return localizedResources;
|
||||
}
|
||||
|
||||
public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
|
||||
return resourcesToBeUploaded;
|
||||
}
|
||||
|
||||
public Map<LocalResourceRequest, Boolean> getResourcesUploadPolicies() {
|
||||
return resourcesUploadPolicies;
|
||||
}
|
||||
|
||||
public Map<LocalResourceRequest, List<String>> getPendingResources() {
|
||||
return pendingResources;
|
||||
}
|
||||
}
|
|
@ -44,7 +44,7 @@ public class ContainerLocalizationRequestEvent extends
|
|||
*/
|
||||
public ContainerLocalizationRequestEvent(Container c,
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
|
||||
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
|
||||
super(LocalizationEventType.LOCALIZE_CONTAINER_RESOURCES, c);
|
||||
this.rsrc = rsrc;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
|
|||
|
||||
public enum LocalizationEventType {
|
||||
INIT_APPLICATION_RESOURCES,
|
||||
INIT_CONTAINER_RESOURCES,
|
||||
LOCALIZE_CONTAINER_RESOURCES,
|
||||
CACHE_CLEANUP,
|
||||
CLEANUP_CONTAINER_RESOURCES,
|
||||
DESTROY_APPLICATION_RESOURCES,
|
||||
|
|
|
@ -86,7 +86,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
|
||||
app.getAppId()));
|
||||
break;
|
||||
case INIT_CONTAINER_RESOURCES:
|
||||
case LOCALIZE_CONTAINER_RESOURCES:
|
||||
ContainerLocalizationRequestEvent rsrcReqs =
|
||||
(ContainerLocalizationRequestEvent) event;
|
||||
// simulate localization of all requested resources
|
||||
|
|
|
@ -131,7 +131,18 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
|
|||
LOG.info("Running testContainerLaunchAndExitFailure");
|
||||
super.testContainerLaunchAndExitFailure();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void testLocalingResourceWhileContainerRunning()
|
||||
throws Exception {
|
||||
// Don't run the test if the binary is not available.
|
||||
if (!shouldRunTest()) {
|
||||
LOG.info("LCE binary path is not passed. Not running the test");
|
||||
return;
|
||||
}
|
||||
super.testLocalingResourceWhileContainerRunning();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testLocalFilesCleanup() throws InterruptedException,
|
||||
IOException, YarnException {
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
|
||||
|
@ -151,7 +152,7 @@ public abstract class BaseAMRMProxyTest {
|
|||
* rest. So the responses returned can be less than the number of end points
|
||||
* specified
|
||||
*
|
||||
* @param testContext
|
||||
* @param testContexts
|
||||
* @param func
|
||||
* @return
|
||||
*/
|
||||
|
@ -698,5 +699,10 @@ public abstract class BaseAMRMProxyTest {
|
|||
public OpportunisticContainerAllocator getContainerAllocator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerExecutor getContainerExecutor() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,17 +36,20 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
|
@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
@ -459,7 +463,138 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
// and verify exit code returned
|
||||
testContainerLaunchAndExit(exitCode);
|
||||
}
|
||||
|
||||
|
||||
private Map<String, LocalResource> setupLocalResources(String fileName,
|
||||
String symLink) throws Exception {
|
||||
// ////// Create the resources for the container
|
||||
File dir = new File(tmpDir, "dir");
|
||||
dir.mkdirs();
|
||||
File file = new File(dir, fileName);
|
||||
PrintWriter fileWriter = new PrintWriter(file);
|
||||
fileWriter.write("Hello World!");
|
||||
fileWriter.close();
|
||||
|
||||
URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext()
|
||||
.makeQualified(new Path(file.getAbsolutePath())));
|
||||
LocalResource resource =
|
||||
recordFactory.newRecordInstance(LocalResource.class);
|
||||
resource.setResource(resourceURL);
|
||||
resource.setSize(-1);
|
||||
resource.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
resource.setType(LocalResourceType.FILE);
|
||||
resource.setTimestamp(file.lastModified());
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
localResources.put(symLink, resource);
|
||||
return localResources;
|
||||
}
|
||||
|
||||
// Start the container
|
||||
// While the container is running, localize new resources.
|
||||
// Verify the symlink is created properly
|
||||
@Test
|
||||
public void testLocalingResourceWhileContainerRunning() throws Exception {
|
||||
// Real del service
|
||||
delSrvc = new DeletionService(exec);
|
||||
delSrvc.init(conf);
|
||||
|
||||
((NodeManager.NMContext)context).setContainerExecutor(exec);
|
||||
containerManager = createContainerManager(delSrvc);
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
// set up local resources
|
||||
Map<String, LocalResource> localResource =
|
||||
setupLocalResources("file", "symLink1");
|
||||
ContainerLaunchContext context =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
context.setLocalResources(localResource);
|
||||
|
||||
// a long running container - sleep
|
||||
context.setCommands(Arrays.asList("sleep 6"));
|
||||
final ContainerId cId = createContainerId(0);
|
||||
|
||||
// start the container
|
||||
StartContainerRequest scRequest = StartContainerRequest.newInstance(context,
|
||||
createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(),
|
||||
user, this.context.getContainerTokenSecretManager()));
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(Arrays.asList(scRequest));
|
||||
containerManager.startContainers(allRequests);
|
||||
BaseContainerManagerTest
|
||||
.waitForContainerState(containerManager, cId, ContainerState.RUNNING);
|
||||
|
||||
BaseContainerManagerTest.waitForApplicationState(containerManager,
|
||||
cId.getApplicationAttemptId().getApplicationId(),
|
||||
ApplicationState.RUNNING);
|
||||
checkResourceLocalized(cId, "symLink1");
|
||||
|
||||
// Localize new local resources while container is running
|
||||
Map<String, LocalResource> localResource2 =
|
||||
setupLocalResources("file2", "symLink2");
|
||||
|
||||
ResourceLocalizationRequest request =
|
||||
ResourceLocalizationRequest.newInstance(cId, localResource2);
|
||||
containerManager.localize(request);
|
||||
|
||||
// Verify resource is localized and symlink is created.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
try {
|
||||
checkResourceLocalized(cId, "symLink2");
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}, 500, 20000);
|
||||
|
||||
BaseContainerManagerTest
|
||||
.waitForContainerState(containerManager, cId, ContainerState.COMPLETE);
|
||||
// Verify container cannot localize resources while at non-running state.
|
||||
try{
|
||||
containerManager.localize(request);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().contains("Not able to localize new resources"));
|
||||
}
|
||||
}
|
||||
|
||||
private void checkResourceLocalized(ContainerId containerId, String symLink) {
|
||||
String appId =
|
||||
containerId.getApplicationAttemptId().getApplicationId().toString();
|
||||
File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
|
||||
File userDir = new File(userCacheDir, user);
|
||||
File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
|
||||
// localDir/usercache/nobody/appcache/application_0_0000
|
||||
File appDir = new File(appCache, appId);
|
||||
// localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000
|
||||
File containerDir = new File(appDir, containerId.toString());
|
||||
// localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1
|
||||
File targetFile = new File(containerDir, symLink);
|
||||
|
||||
File sysDir =
|
||||
new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR);
|
||||
// localDir/nmPrivate/application_0_0000
|
||||
File appSysDir = new File(sysDir, appId);
|
||||
// localDir/nmPrivate/application_0_0000/container_0_0000_01_000000
|
||||
File containerSysDir = new File(appSysDir, containerId.toString());
|
||||
|
||||
Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!",
|
||||
appDir.exists());
|
||||
Assert.assertTrue(
|
||||
"AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!",
|
||||
appSysDir.exists());
|
||||
Assert.assertTrue(
|
||||
"containerDir " + containerDir.getAbsolutePath() + " doesn't exist !",
|
||||
containerDir.exists());
|
||||
Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath()
|
||||
+ " doesn't exist !", containerDir.exists());
|
||||
Assert.assertTrue(
|
||||
"targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!",
|
||||
targetFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFilesCleanup() throws InterruptedException,
|
||||
IOException, YarnException {
|
||||
|
|
|
@ -66,6 +66,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
@ -1766,7 +1767,7 @@ public class TestResourceLocalizationService {
|
|||
// creating new containers and populating corresponding localizer runners
|
||||
|
||||
// Container - 1
|
||||
ContainerImpl container1 = createMockContainer(user, 1);
|
||||
Container container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainerId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
|
@ -2291,7 +2292,7 @@ public class TestResourceLocalizationService {
|
|||
}
|
||||
|
||||
private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
|
||||
ContainerImpl container, LocalResourceVisibility vis,
|
||||
Container container, LocalResourceVisibility vis,
|
||||
LocalResourceRequest req) {
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
|
||||
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
|
||||
|
@ -2309,6 +2310,7 @@ public class TestResourceLocalizationService {
|
|||
when(container.getUser()).thenReturn(user);
|
||||
Credentials mockCredentials = mock(Credentials.class);
|
||||
when(container.getCredentials()).thenReturn(mockCredentials);
|
||||
when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -2357,6 +2359,7 @@ public class TestResourceLocalizationService {
|
|||
creds.addToken(new Text("tok" + id), tk);
|
||||
when(c.getCredentials()).thenReturn(creds);
|
||||
when(c.toString()).thenReturn(cId.toString());
|
||||
when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING);
|
||||
return c;
|
||||
}
|
||||
|
||||
|
|
|
@ -89,6 +89,13 @@ public class TestContainersMonitorResourceChange {
|
|||
public void deleteAsUser(DeletionAsUserContext ctx)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void symLink(String target, String symlink)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProcessId(ContainerId containerId) {
|
||||
return String.valueOf(containerId.getContainerId());
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -18,11 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -40,8 +36,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MockContainer implements Container {
|
||||
|
||||
private ContainerId id;
|
||||
|
@ -117,6 +119,11 @@ public class MockContainer implements Container {
|
|||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet getResourceSet() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerEvent event) {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue