MAPREDUCE-2691. svn merge -c r1167676 --ignore-ancestry ../../trunk/

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1167677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-11 06:29:05 +00:00
parent 5bb361cee4
commit 6814d04c3e
14 changed files with 799 additions and 318 deletions

View File

@ -1252,6 +1252,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList
(jobs) (Thomas Graves via mahadev) (jobs) (Thomas Graves via mahadev)
MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources
and related tests. (Siddharth Seth via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -711,7 +711,7 @@ public abstract class TaskAttemptImpl implements
String linkName = name.toUri().getPath(); String linkName = name.toUri().getPath();
container.setLocalResource( container.setLocalResource(
linkName, linkName,
BuilderUtils.newLocalResource(recordFactory, BuilderUtils.newLocalResource(
p.toUri(), type, p.toUri(), type,
visibilities[i] visibilities[i]
? LocalResourceVisibility.PUBLIC ? LocalResourceVisibility.PUBLIC

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -86,12 +87,11 @@ public class BuilderUtils {
} }
} }
public static LocalResource newLocalResource(RecordFactory recordFactory, public static LocalResource newLocalResource(URL url, LocalResourceType type,
URI uri, LocalResourceType type, LocalResourceVisibility visibility, LocalResourceVisibility visibility, long size, long timestamp) {
long size, long timestamp) {
LocalResource resource = LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class); recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(ConverterUtils.getYarnUrlFromURI(uri)); resource.setResource(url);
resource.setType(type); resource.setType(type);
resource.setVisibility(visibility); resource.setVisibility(visibility);
resource.setSize(size); resource.setSize(size);
@ -99,6 +99,13 @@ public class BuilderUtils {
return resource; return resource;
} }
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility, long size,
long timestamp) {
return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
visibility, size, timestamp);
}
public static ApplicationId newApplicationId(RecordFactory recordFactory, public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clustertimestamp, CharSequence id) { long clustertimestamp, CharSequence id) {
ApplicationId applicationId = ApplicationId applicationId =
@ -125,6 +132,15 @@ public class BuilderUtils {
return applicationId; return applicationId;
} }
public static ApplicationAttemptId newApplicationAttemptId(
ApplicationId appId, int attemptId) {
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(attemptId);
return appAttemptId;
}
public static ApplicationId convert(long clustertimestamp, CharSequence id) { public static ApplicationId convert(long clustertimestamp, CharSequence id) {
ApplicationId applicationId = ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class); recordFactory.newRecordInstance(ApplicationId.class);
@ -133,6 +149,24 @@ public class BuilderUtils {
return applicationId; return applicationId;
} }
public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
int containerId) {
ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
id.setAppId(appAttemptId.getApplicationId());
id.setId(containerId);
id.setAppAttemptId(appAttemptId);
return id;
}
public static ContainerId newContainerId(int appId, int appAttemptId,
long timestamp, int id) {
ApplicationId applicationId = newApplicationId(timestamp, appId);
ApplicationAttemptId applicationAttemptId = newApplicationAttemptId(
applicationId, appAttemptId);
ContainerId cId = newContainerId(applicationAttemptId, id);
return cId;
}
public static ContainerId newContainerId(RecordFactory recordFactory, public static ContainerId newContainerId(RecordFactory recordFactory,
ApplicationId appId, ApplicationAttemptId appAttemptId, ApplicationId appId, ApplicationAttemptId appAttemptId,
int containerId) { int containerId) {
@ -227,4 +261,20 @@ public class BuilderUtils {
report.setStartTime(startTime); report.setStartTime(startTime);
return report; return report;
} }
public static Resource newResource(int memory) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);
return resource;
}
public static URL newURL(String scheme, String host, int port, String file) {
URL url = recordFactory.newRecordInstance(URL.class);
url.setScheme(scheme);
url.setHost(host);
url.setPort(port);
url.setFile(file);
return url;
}
} }

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -81,6 +84,12 @@ public class ContainerImpl implements Container {
new HashMap<LocalResourceRequest,String>(); new HashMap<LocalResourceRequest,String>();
private final Map<Path,String> localizedResources = private final Map<Path,String> localizedResources =
new HashMap<Path,String>(); new HashMap<Path,String>();
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
public ContainerImpl(Dispatcher dispatcher, public ContainerImpl(Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds, ContainerLaunchContext launchContext, Credentials creds,
@ -361,7 +370,7 @@ public class ContainerImpl implements Container {
} }
} }
@SuppressWarnings("fallthrough") @SuppressWarnings({"fallthrough", "unchecked"})
private void finished() { private void finished() {
switch (getContainerState()) { switch (getContainerState()) {
case EXITED_WITH_SUCCESS: case EXITED_WITH_SUCCESS:
@ -404,6 +413,24 @@ public class ContainerImpl implements Container {
containerID, exitCode)); containerID, exitCode));
} }
@SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!publicRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
}
if (!privateRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
}
if (!appRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
}
dispatcher.getEventHandler().handle(
new ContainerLocalizationCleanupEvent(this, rsrc));
}
static class ContainerTransition implements static class ContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@ -439,12 +466,6 @@ public class ContainerImpl implements Container {
// Send requests for public, private resources // Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources(); Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
if (!cntrRsrc.isEmpty()) { if (!cntrRsrc.isEmpty()) {
ArrayList<LocalResourceRequest> publicRsrc =
new ArrayList<LocalResourceRequest>();
ArrayList<LocalResourceRequest> privateRsrc =
new ArrayList<LocalResourceRequest>();
ArrayList<LocalResourceRequest> appRsrc =
new ArrayList<LocalResourceRequest>();
try { try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) { for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try { try {
@ -453,13 +474,13 @@ public class ContainerImpl implements Container {
container.pendingResources.put(req, rsrc.getKey()); container.pendingResources.put(req, rsrc.getKey());
switch (rsrc.getValue().getVisibility()) { switch (rsrc.getValue().getVisibility()) {
case PUBLIC: case PUBLIC:
publicRsrc.add(req); container.publicRsrcs.add(req);
break; break;
case PRIVATE: case PRIVATE:
privateRsrc.add(req); container.privateRsrcs.add(req);
break; break;
case APPLICATION: case APPLICATION:
appRsrc.add(req); container.appRsrcs.add(req);
break; break;
} }
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
@ -471,27 +492,25 @@ public class ContainerImpl implements Container {
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
// malformed resource; abort container launch // malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e); LOG.warn("Failed to parse resource-request", e);
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.metrics.endInitingContainer(); container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED; return ContainerState.LOCALIZATION_FAILED;
} }
if (!publicRsrc.isEmpty()) { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.dispatcher.getEventHandler().handle( new HashMap<LocalResourceVisibility,
new ContainerLocalizationRequestEvent( Collection<LocalResourceRequest>>();
container, publicRsrc, LocalResourceVisibility.PUBLIC)); if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
} }
if (!privateRsrc.isEmpty()) { if (!container.privateRsrcs.isEmpty()) {
container.dispatcher.getEventHandler().handle( req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
new ContainerLocalizationRequestEvent(
container, privateRsrc, LocalResourceVisibility.PRIVATE));
} }
if (!appRsrc.isEmpty()) { if (!container.appRsrcs.isEmpty()) {
container.dispatcher.getEventHandler().handle( req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
new ContainerLocalizationRequestEvent(
container, appRsrc, LocalResourceVisibility.APPLICATION));
} }
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING; return ContainerState.LOCALIZING;
} else { } else {
container.dispatcher.getEventHandler().handle( container.dispatcher.getEventHandler().handle(
@ -546,7 +565,6 @@ public class ContainerImpl implements Container {
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition { static class ExitedWithSuccessTransition extends ContainerTransition {
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
@ -554,13 +572,10 @@ public class ContainerImpl implements Container {
// Inform the localizer to decrement reference counts and cleanup // Inform the localizer to decrement reference counts and cleanup
// resources. // resources.
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition { static class ExitedWithFailureTransition extends ContainerTransition {
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
@ -572,13 +587,10 @@ public class ContainerImpl implements Container {
// Inform the localizer to decrement reference counts and cleanup // Inform the localizer to decrement reference counts and cleanup
// resources. // resources.
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ResourceFailedTransition implements static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -592,30 +604,24 @@ public class ContainerImpl implements Container {
// Inform the localizer to decrement reference counts and cleanup // Inform the localizer to decrement reference counts and cleanup
// resources. // resources.
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.metrics.endInitingContainer(); container.metrics.endInitingContainer();
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillDuringLocalizationTransition implements static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup // Inform the localizer to decrement reference counts and cleanup
// resources. // resources.
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.metrics.endInitingContainer(); container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event; ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedResourceDuringKillTransition implements static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -647,7 +653,6 @@ public class ContainerImpl implements Container {
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerKilledTransition implements static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -657,13 +662,10 @@ public class ContainerImpl implements Container {
// The process/process-grp is killed. Decrement reference counts and // The process/process-grp is killed. Decrement reference counts and
// cleanup resources // cleanup resources
container.dispatcher.getEventHandler().handle( container.cleanup();
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerDoneTransition implements static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -697,7 +699,8 @@ public class ContainerImpl implements Container {
newState = newState =
stateMachine.doTransition(event.getType(), event); stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) { } catch (InvalidStateTransitonException e) {
LOG.warn("Can't handle this event at current state", e); LOG.warn("Can't handle this event at current state: Current: ["
+ oldState + "], eventType: [" + event.getType() + "]", e);
} }
if (oldState != newState) { if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from " LOG.info("Container " + containerID + " transitioned from "

View File

@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
@ -274,7 +273,7 @@ public class ContainerLocalizer {
stat.setLocalPath( stat.setLocalPath(
ConverterUtils.getYarnUrlFromPath(localPath)); ConverterUtils.getYarnUrlFromPath(localPath));
stat.setLocalSize( stat.setLocalSize(
FileUtil.getDU(new File(localPath.getParent().toString()))); FileUtil.getDU(new File(localPath.getParent().toUri())));
stat.setStatus(ResourceStatusType.FETCH_SUCCESS); stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE); stat.setStatus(ResourceStatusType.FETCH_FAILURE);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
* {@link LocalResourceVisibility}. * {@link LocalResourceVisibility}.
* *
*/ */
class LocalResourcesTrackerImpl implements LocalResourcesTracker { class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
@Override @Override
public boolean remove(LocalizedResource rem, DeletionService delService) { public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup // current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.remove(rem.getRequest()); LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) { if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest() + LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
" from " + getUser()); " from " + getUser());
@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|| ResourceState.DOWNLOADING.equals(rsrc.getState()) || ResourceState.DOWNLOADING.equals(rsrc.getState())
|| rsrc != rem) { || rsrc != rem) {
// internal error // internal error
LOG.error("Attempt to remove resource with non-zero refcount"); LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
assert false; assert false;
return false; return false;
} }
localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) { if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), rsrc.getLocalPath()); delService.delete(getUser(), rsrc.getLocalPath());
} }

View File

@ -120,7 +120,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
for (ContainerId c : ref) { for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")"); sb.append("(").append(c.toString()).append(")");
} }
sb.append("],").append(getTimestamp()).append("}"); sb.append("],").append(getTimestamp()).append(",")
.append(getState()).append("}");
return sb.toString(); return sb.toString();
} }

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -63,7 +65,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -93,7 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -101,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@ -198,7 +200,7 @@ public class ResourceLocalizationService extends AbstractService
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS); conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = NetUtils.createSocketAddr( localizationServerAddress = NetUtils.createSocketAddr(
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS)); conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
localizerTracker = new LocalizerTracker(conf); localizerTracker = createLocalizerTracker(conf);
dispatcher.register(LocalizerEventType.class, localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker);
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@ -218,6 +220,10 @@ public class ResourceLocalizationService extends AbstractService
super.start(); super.start();
} }
LocalizerTracker createLocalizerTracker(Configuration conf) {
return new LocalizerTracker(conf);
}
Server createServer() { Server createServer() {
YarnRPC rpc = YarnRPC.create(getConfig()); YarnRPC rpc = YarnRPC.create(getConfig());
Configuration conf = new Configuration(getConfig()); // Clone to separate Configuration conf = new Configuration(getConfig()); // Clone to separate
@ -252,6 +258,9 @@ public class ResourceLocalizationService extends AbstractService
public void handle(LocalizationEvent event) { public void handle(LocalizationEvent event) {
String userName; String userName;
String appIDStr; String appIDStr;
Container c;
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId // TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) { switch (event.getType()) {
case INIT_APPLICATION_RESOURCES: case INIT_APPLICATION_RESOURCES:
@ -276,28 +285,16 @@ public class ResourceLocalizationService extends AbstractService
case INIT_CONTAINER_RESOURCES: case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs = ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event; (ContainerLocalizationRequestEvent) event;
Container c = rsrcReqs.getContainer(); c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext( LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials()); c.getUser(), c.getContainerID(), c.getCredentials());
final LocalResourcesTracker tracker; rsrcs = rsrcReqs.getRequestedResources();
LocalResourceVisibility vis = rsrcReqs.getVisibility(); for (LocalResourceVisibility vis : rsrcs.keySet()) {
switch (vis) { tracker = getLocalResourcesTracker(vis, c.getUser(),
default: c.getContainerID().getAppId());
case PUBLIC: for (LocalResourceRequest req : rsrcs.get(vis)) {
tracker = publicRsrc; tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
break; }
case PRIVATE:
tracker = privateRsrc.get(c.getUser());
break;
case APPLICATION:
tracker =
appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
break;
}
// We get separate events one each for all resources of one visibility. So
// all the resources in this event are of the same visibility.
for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
} }
break; break;
case CACHE_CLEANUP: case CACHE_CLEANUP:
@ -311,14 +308,23 @@ public class ResourceLocalizationService extends AbstractService
} }
break; break;
case CLEANUP_CONTAINER_RESOURCES: case CLEANUP_CONTAINER_RESOURCES:
Container container = ContainerLocalizationCleanupEvent rsrcCleanup =
((ContainerLocalizationEvent)event).getContainer(); (ContainerLocalizationCleanupEvent) event;
c = rsrcCleanup.getContainer();
rsrcs = rsrcCleanup.getResources();
for (LocalResourceVisibility vis : rsrcs.keySet()) {
tracker = getLocalResourcesTracker(vis, c.getUser(),
c.getContainerID().getAppId());
for (LocalResourceRequest req : rsrcs.get(vis)) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
}
}
// Delete the container directories // Delete the container directories
userName = container.getUser(); userName = c.getUser();
String containerIDStr = container.toString(); String containerIDStr = c.toString();
appIDStr = appIDStr =
ConverterUtils.toString(container.getContainerID().getAppId()); ConverterUtils.toString(c.getContainerID().getAppId());
for (Path localDir : localDirs) { for (Path localDir : localDirs) {
// Delete the user-owned container-dir // Delete the user-owned container-dir
@ -336,8 +342,7 @@ public class ResourceLocalizationService extends AbstractService
delService.delete(null, containerSysDir, new Path[] {}); delService.delete(null, containerSysDir, new Path[] {});
} }
dispatcher.getEventHandler().handle(new ContainerEvent( dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
container.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break; break;
case DESTROY_APPLICATION_RESOURCES: case DESTROY_APPLICATION_RESOURCES:
@ -379,6 +384,19 @@ public class ResourceLocalizationService extends AbstractService
} }
} }
LocalResourcesTracker getLocalResourcesTracker(
LocalResourceVisibility visibility, String user, ApplicationId appId) {
switch (visibility) {
default:
case PUBLIC:
return publicRsrc;
case PRIVATE:
return privateRsrc.get(user);
case APPLICATION:
return appRsrc.get(ConverterUtils.toString(appId));
}
}
/** /**
* Sub-component handling the spawning of {@link ContainerLocalizer}s * Sub-component handling the spawning of {@link ContainerLocalizer}s
*/ */
@ -526,6 +544,7 @@ public class ResourceLocalizationService extends AbstractService
} }
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() { public void run() {
try { try {
// TODO shutdown, better error handling esp. DU // TODO shutdown, better error handling esp. DU
@ -651,6 +670,7 @@ public class ResourceLocalizationService extends AbstractService
} }
// TODO this sucks. Fix it later // TODO this sucks. Fix it later
@SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update( LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) { List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response = LocalizerHeartbeatResponse response =
@ -795,6 +815,7 @@ public class ResourceLocalizationService extends AbstractService
} }
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() { public void run() {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP)); new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
public class ContainerLocalizationCleanupEvent extends
ContainerLocalizationEvent {
private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
rsrc;
/**
* Event requesting the cleanup of the rsrc.
* @param c
* @param rsrc
*/
public ContainerLocalizationCleanupEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, c);
this.rsrc = rsrc;
}
public
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
getResources() {
return rsrc;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Loca
public class ContainerLocalizationRequestEvent extends public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent { ContainerLocalizationEvent {
private final LocalResourceVisibility vis; private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
private final Collection<LocalResourceRequest> reqs; rsrc;
/** /**
* Event requesting the localization of the reqs all with visibility vis * Event requesting the localization of the rsrc.
* @param c * @param c
* @param reqs * @param rsrc
* @param vis
*/ */
public ContainerLocalizationRequestEvent(Container c, public ContainerLocalizationRequestEvent(Container c,
Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c); super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
this.vis = vis; this.rsrc = rsrc;
this.reqs = reqs;
} }
public LocalResourceVisibility getVisibility() { public
return vis; Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
getRequestedResources() {
return rsrc;
} }
}
public Collection<LocalResourceRequest> getRequestedResources() {
return reqs;
}
}

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.net.URISyntaxException;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@ -26,8 +24,8 @@ public class ResourceReleaseEvent extends ResourceEvent {
private final ContainerId container; private final ContainerId container;
public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container) public ResourceReleaseEvent(LocalResourceRequest rsrc,
throws URISyntaxException { ContainerId container) {
super(rsrc, ResourceEventType.RELEASE); super(rsrc, ResourceEventType.RELEASE);
this.container = container; this.container = container;
} }

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -79,14 +81,17 @@ public class DummyContainerManager extends ContainerManagerImpl {
ContainerLocalizationRequestEvent rsrcReqs = ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event; (ContainerLocalizationRequestEvent) event;
// simulate localization of all requested resources // simulate localization of all requested resources
for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) { for (Collection<LocalResourceRequest> rc : rsrcReqs
LOG.info("DEBUG: " + req + ":" + .getRequestedResources().values()) {
rsrcReqs.getContainer().getContainerID()); for (LocalResourceRequest req : rc) {
dispatcher.getEventHandler().handle( LOG.info("DEBUG: " + req + ":"
new ContainerResourceLocalizedEvent( + rsrcReqs.getContainer().getContainerID());
rsrcReqs.getContainer().getContainerID(), req, dispatcher.getEventHandler().handle(
new Path("file:///local" + req.getPath().toUri().getPath()))); new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
} .getContainerID(), req, new Path("file:///local"
+ req.getPath().toUri().getPath())));
}
}
break; break;
case CLEANUP_CONTAINER_RESOURCES: case CLEANUP_CONTAINER_RESOURCES:
Container container = Container container =

View File

@ -17,208 +17,203 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.event.Dispatcher; import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.AbstractMap.SimpleEntry; import java.util.Random;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestContainer { public class TestContainer {
final NodeManagerMetrics metrics = NodeManagerMetrics.create(); final NodeManagerMetrics metrics = NodeManagerMetrics.create();
/** /**
* Verify correct container request events sent to localizer. * Verify correct container request events sent to localizer.
*/ */
@Test @Test
@SuppressWarnings("unchecked") // mocked generic
public void testLocalizationRequest() throws Exception { public void testLocalizationRequest() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher(); WrappedContainer wc = null;
dispatcher.init(null);
try { try {
dispatcher.start(); wc = new WrappedContainer(7, 314159265358979L, 4344, "yak");
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class); assertEquals(ContainerState.NEW, wc.c.getContainerState());
dispatcher.register(LocalizationEventType.class, localizerBus); wc.initContainer();
// null serviceData; no registered AuxServicesEventType handler
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
ContainerId cId = getMockContainerId(7, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testLocalizationRequest seed: " + seed);
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
// Verify request for public/private resources to localizer // Verify request for public/private resources to localizer
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER)); ResourcesRequestedMatcher matchesReq =
dispatcher.await(); new ResourcesRequestedMatcher(wc.localResources, EnumSet.of(
ContainerReqMatcher matchesPublicReq = LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
new ContainerReqMatcher(localResources, LocalResourceVisibility.APPLICATION));
EnumSet.of(LocalResourceVisibility.PUBLIC)); verify(wc.localizerBus).handle(argThat(matchesReq));
ContainerReqMatcher matchesPrivateReq = assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
new ContainerReqMatcher(localResources,
EnumSet.of(LocalResourceVisibility.PRIVATE));
ContainerReqMatcher matchesAppReq =
new ContainerReqMatcher(localResources,
EnumSet.of(LocalResourceVisibility.APPLICATION));
verify(localizerBus).handle(argThat(matchesPublicReq));
verify(localizerBus).handle(argThat(matchesPrivateReq));
verify(localizerBus).handle(argThat(matchesAppReq));
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
} finally {
dispatcher.stop();
} }
finally {
if (wc != null) {
wc.finished();
}
}
} }
/** /**
* Verify container launch when all resources already cached. * Verify container launch when all resources already cached.
*/ */
@Test @Test
@SuppressWarnings("unchecked") // mocked generic
public void testLocalizationLaunch() throws Exception { public void testLocalizationLaunch() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher(); WrappedContainer wc = null;
dispatcher.init(null);
try { try {
dispatcher.start(); wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class); assertEquals(ContainerState.NEW, wc.c.getContainerState());
dispatcher.register(LocalizationEventType.class, localizerBus); wc.initContainer();
EventHandler<ContainersLauncherEvent> launcherBus = Map<Path, String> localPaths = wc.localizeResources();
mock(EventHandler.class);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
// null serviceData; no registered AuxServicesEventType handler
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
ContainerId cId = getMockContainerId(8, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testLocalizationLaunch seed: " + seed);
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
dispatcher.await();
// Container prepared for localization events
Path cache = new Path("file:///cache");
Map<Path,String> localPaths = new HashMap<Path,String>();
for (Entry<String,LocalResource> rsrc : localResources.entrySet()) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
// rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p));
}
dispatcher.await();
// all resources should be localized // all resources should be localized
assertEquals(ContainerState.LOCALIZED, c.getContainerState()); assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
for (Entry<Path,String> loc : c.getLocalizedResources().entrySet()) { for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue()); assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
} }
assertTrue(localPaths.isEmpty()); assertTrue(localPaths.isEmpty());
final WrappedContainer wcf = wc;
// verify container launch // verify container launch
ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch = ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch =
new ArgumentMatcher<ContainersLauncherEvent>() { new ArgumentMatcher<ContainersLauncherEvent>() {
@Override @Override
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o; ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o;
return c == launchEvent.getContainer(); return wcf.c == launchEvent.getContainer();
} }
}; };
verify(launcherBus).handle(argThat(matchesContainerLaunch)); verify(wc.launcherBus).handle(argThat(matchesContainerLaunch));
} finally { } finally {
dispatcher.stop(); if (wc != null) {
wc.finished();
}
} }
} }
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerFailed(ExitCode.KILLED.getExitCode());
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnSuccess() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerSuccessful();
assertEquals(ContainerState.EXITED_WITH_SUCCESS,
wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnKillRequest() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(12, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.containerKilledOnRequest();
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
/** /**
* Verify serviceData correctly sent. * Verify serviceData correctly sent.
*/ */
@Test @Test
@SuppressWarnings("unchecked") // mocked generic
public void testServiceData() throws Exception { public void testServiceData() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher(); WrappedContainer wc = null;
dispatcher.init(null);
dispatcher.start();
try { try {
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class); wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true);
dispatcher.register(LocalizationEventType.class, localizerBus); assertEquals(ContainerState.NEW, wc.c.getContainerState());
EventHandler<AuxServicesEvent> auxBus = mock(EventHandler.class); wc.initContainer();
dispatcher.register(AuxServicesEventType.class, auxBus);
EventHandler<ContainersLauncherEvent> launchBus = mock(EventHandler.class); for (final Map.Entry<String,ByteBuffer> e : wc.serviceData.entrySet()) {
dispatcher.register(ContainersLauncherEventType.class, launchBus);
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
when(ctxt.getAllLocalResources()).thenReturn(
Collections.<String,LocalResource>emptyMap());
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testServiceData seed: " + seed);
final Map<String,ByteBuffer> serviceData = createServiceData(r);
when(ctxt.getAllServiceData()).thenReturn(serviceData);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
// Verify propagation of service data to AuxServices
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
dispatcher.await();
for (final Map.Entry<String,ByteBuffer> e : serviceData.entrySet()) {
ArgumentMatcher<AuxServicesEvent> matchesServiceReq = ArgumentMatcher<AuxServicesEvent> matchesServiceReq =
new ArgumentMatcher<AuxServicesEvent>() { new ArgumentMatcher<AuxServicesEvent>() {
@Override @Override
@ -228,9 +223,10 @@ public class TestContainer {
&& 0 == e.getValue().compareTo(evt.getServiceData()); && 0 == e.getValue().compareTo(evt.getServiceData());
} }
}; };
verify(auxBus).handle(argThat(matchesServiceReq)); verify(wc.auxBus).handle(argThat(matchesServiceReq));
} }
final WrappedContainer wcf = wc;
// verify launch on empty resource request // verify launch on empty resource request
ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq = ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq =
new ArgumentMatcher<ContainersLauncherEvent>() { new ArgumentMatcher<ContainersLauncherEvent>() {
@ -238,61 +234,103 @@ public class TestContainer {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o; ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& cId == evt.getContainer().getContainerID(); && wcf.cId == evt.getContainer().getContainerID();
} }
}; };
verify(launchBus).handle(argThat(matchesLaunchReq)); verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
} finally { } finally {
dispatcher.stop(); if (wc != null) {
wc.finished();
}
} }
} }
// Accept iff the resource request payload matches. private void verifyCleanupCall(WrappedContainer wc) throws Exception {
static class ContainerReqMatcher extends ArgumentMatcher<LocalizationEvent> { ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
LocalResourceVisibility.APPLICATION));
verify(wc.localizerBus).handle(argThat(matchesReq));
}
private static class ResourcesReleasedMatcher extends
ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources = final HashSet<LocalResourceRequest> resources =
new HashSet<LocalResourceRequest>(); new HashSet<LocalResourceRequest>();
ContainerReqMatcher(Map<String,LocalResource> allResources,
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException { EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
for (Entry<String,LocalResource> e : allResources.entrySet()) { for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) { if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue())); resources.add(new LocalResourceRequest(e.getValue()));
} }
} }
} }
@Override @Override
public boolean matches(Object o) { public boolean matches(Object o) {
ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o; if (!(o instanceof ContainerLocalizationCleanupEvent)) {
return false;
}
ContainerLocalizationCleanupEvent evt =
(ContainerLocalizationCleanupEvent) o;
final HashSet<LocalResourceRequest> expected = final HashSet<LocalResourceRequest> expected =
new HashSet<LocalResourceRequest>(resources); new HashSet<LocalResourceRequest>(resources);
for (LocalResourceRequest rsrc : evt.getRequestedResources()) { for (Collection<LocalResourceRequest> rc : evt.getResources().values()) {
if (!expected.remove(rsrc)) { for (LocalResourceRequest rsrc : rc) {
return false; if (!expected.remove(rsrc)) {
return false;
}
} }
} }
return expected.isEmpty(); return expected.isEmpty();
} }
} }
static Entry<String,LocalResource> getMockRsrc(Random r, // Accept iff the resource payload matches.
LocalResourceVisibility vis) { private static class ResourcesRequestedMatcher extends
LocalResource rsrc = mock(LocalResource.class); ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
new HashSet<LocalResourceRequest>();
String name = Long.toHexString(r.nextLong()); ResourcesRequestedMatcher(Map<String, LocalResource> allResources,
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
when(uri.getScheme()).thenReturn("file"); for (Entry<String, LocalResource> e : allResources.entrySet()) {
when(uri.getHost()).thenReturn(null); if (vis.contains(e.getValue().getVisibility())) {
when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); resources.add(new LocalResourceRequest(e.getValue()));
}
}
}
when(rsrc.getResource()).thenReturn(uri); @Override
when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); public boolean matches(Object o) {
when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); ContainerLocalizationRequestEvent evt =
when(rsrc.getType()).thenReturn(LocalResourceType.FILE); (ContainerLocalizationRequestEvent) o;
when(rsrc.getVisibility()).thenReturn(vis); final HashSet<LocalResourceRequest> expected =
new HashSet<LocalResourceRequest>(resources);
return new SimpleEntry<String,LocalResource>(name, rsrc); for (Collection<LocalResourceRequest> rc : evt.getRequestedResources()
.values()) {
for (LocalResourceRequest rsrc : rc) {
if (!expected.remove(rsrc)) {
return false;
}
}
}
return expected.isEmpty();
}
} }
static Map<String,LocalResource> createLocalResources(Random r) { private static Entry<String, LocalResource> getMockRsrc(Random r,
LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return new SimpleEntry<String, LocalResource>(name, rsrc);
}
private static Map<String,LocalResource> createLocalResources(Random r) {
Map<String,LocalResource> localResources = Map<String,LocalResource> localResources =
new HashMap<String,LocalResource>(); new HashMap<String,LocalResource>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) { for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@ -313,17 +351,7 @@ public class TestContainer {
return localResources; return localResources;
} }
static ContainerId getMockContainerId(int appId, long timestamp, int id) { private static Map<String,ByteBuffer> createServiceData(Random r) {
ApplicationId aId = mock(ApplicationId.class);
when(aId.getId()).thenReturn(appId);
when(aId.getClusterTimestamp()).thenReturn(timestamp);
ContainerId cId = mock(ContainerId.class);
when(cId.getId()).thenReturn(id);
when(cId.getAppId()).thenReturn(aId);
return cId;
}
static Map<String,ByteBuffer> createServiceData(Random r) {
Map<String,ByteBuffer> serviceData = Map<String,ByteBuffer> serviceData =
new HashMap<String,ByteBuffer>(); new HashMap<String,ByteBuffer>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) { for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@ -335,7 +363,134 @@ public class TestContainer {
return serviceData; return serviceData;
} }
Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) { private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
return new ContainerImpl(disp, ctx, null, metrics); return new ContainerImpl(disp, ctx, null, metrics);
} }
@SuppressWarnings("unchecked")
private class WrappedContainer {
final DrainDispatcher dispatcher;
final EventHandler<LocalizationEvent> localizerBus;
final EventHandler<ContainersLauncherEvent> launcherBus;
final EventHandler<ContainersMonitorEvent> monitorBus;
final EventHandler<AuxServicesEvent> auxBus;
final ContainerLaunchContext ctxt;
final ContainerId cId;
final Container c;
final Map<String, LocalResource> localResources;
final Map<String, ByteBuffer> serviceData;
final String user;
WrappedContainer(int appId, long timestamp, int id, String user) {
this(appId, timestamp, id, user, true, false);
}
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) {
dispatcher = new DrainDispatcher();
dispatcher.init(null);
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);
monitorBus = mock(EventHandler.class);
auxBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
this.user = user;
ctxt = mock(ContainerLaunchContext.class);
cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
when(ctxt.getUser()).thenReturn(this.user);
when(ctxt.getContainerId()).thenReturn(cId);
Resource resource = BuilderUtils.newResource(1024);
when(ctxt.getResource()).thenReturn(resource);
if (withLocalRes) {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("WrappedContainerLocalResource seed: " + seed);
localResources = createLocalResources(r);
} else {
localResources = Collections.<String, LocalResource> emptyMap();
}
when(ctxt.getAllLocalResources()).thenReturn(localResources);
if (withServiceData) {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("ServiceData seed: " + seed);
serviceData = createServiceData(r);
} else {
serviceData = Collections.<String, ByteBuffer> emptyMap();
}
when(ctxt.getAllServiceData()).thenReturn(serviceData);
c = newContainer(dispatcher, ctxt);
dispatcher.start();
}
private void drainDispatcherEvents() {
dispatcher.await();
}
public void finished() {
dispatcher.stop();
}
public void initContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
drainDispatcherEvents();
}
public Map<Path, String> localizeResources() throws URISyntaxException {
Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>();
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
// rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
req, p));
}
drainDispatcherEvents();
return localPaths;
}
public void launchContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
drainDispatcherEvents();
}
public void containerSuccessful() {
c.handle(new ContainerEvent(cId,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
drainDispatcherEvents();
}
public void containerFailed(int exitCode) {
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
drainDispatcherEvents();
}
public void killContainer() {
c.handle(new ContainerKillEvent(cId, "KillRequest"));
drainDispatcherEvents();
}
public void containerKilledOnRequest() {
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
.getExitCode()));
drainDispatcherEvents();
}
}
} }

View File

@ -21,10 +21,17 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import junit.framework.Assert;
import org.apache.avro.ipc.Server; import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -63,11 +70,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -132,6 +143,190 @@ public class TestResourceLocalizationService {
} }
} }
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
Server ignore = mock(Server.class);
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
//Ignore actual localization
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = new DeletionService(exec);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
.getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
//Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
LocalResourcesTracker appTracker =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user, appId);
LocalResourcesTracker privTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
user, appId);
LocalResourcesTracker pubTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
user, appId);
// init container.
final Container c = getMockContainer(appId, 42);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// Send localization requests for one resource of each type.
final LocalResource privResource = getPrivateMockedResource(r);
final LocalResourceRequest privReq =
new LocalResourceRequest(privResource);
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
final LocalResource pubResource2 = getPublicMockedResource(r);
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);
final LocalResource appResource = getAppMockedResource(r);
final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
req.put(LocalResourceVisibility.APPLICATION,
Collections.singletonList(appReq));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req2.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req2.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq2));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
// Send Request event
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
dispatcher.await();
int privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
int pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
int appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
//Send Cleanup Event
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
req2.remove(LocalResourceVisibility.PRIVATE);
spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
dispatcher.await();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
} finally {
dispatcher.stop();
delService.stop();
}
}
@Test @Test
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception { public void testLocalizationHeartbeat() throws Exception {
@ -175,9 +370,8 @@ public class TestResourceLocalizationService {
// init application // init application
final Application app = mock(Application.class); final Application app = mock(Application.class);
final ApplicationId appId = mock(ApplicationId.class); final ApplicationId appId =
when(appId.getClusterTimestamp()).thenReturn(314159265358979L); BuilderUtils.newApplicationId(314159265358979L, 3);
when(appId.getId()).thenReturn(3);
when(app.getUser()).thenReturn("user0"); when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId); when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent( spyService.handle(new ApplicationLocalizationEvent(
@ -205,11 +399,13 @@ public class TestResourceLocalizationService {
doReturn(out).when(spylfs).createInternal(isA(Path.class), doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), anyInt(), anyBoolean()); anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
final LocalResource resource = getMockResource(r); final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource); final LocalResourceRequest req = new LocalResourceRequest(resource);
spyService.handle(new ContainerLocalizationRequestEvent( Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
c, Collections.singletonList(req), new HashMap<LocalResourceVisibility,
LocalResourceVisibility.PRIVATE)); Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible // Sigh. Thread init of private localizer not accessible
Thread.sleep(500); Thread.sleep(500);
dispatcher.await(); dispatcher.await();
@ -265,42 +461,44 @@ public class TestResourceLocalizationService {
} }
} }
static URL getPath(String path) { private static URL getPath(String path) {
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); URL url = BuilderUtils.newURL("file", null, 0, path);
when(uri.getScheme()).thenReturn("file"); return url;
when(uri.getHost()).thenReturn(null);
when(uri.getFile()).thenReturn(path);
return uri;
} }
static LocalResource getMockResource(Random r) { private static LocalResource getMockedResource(Random r,
LocalResource rsrc = mock(LocalResource.class); LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong()); String name = Long.toHexString(r.nextLong());
URL uri = getPath("/local/PRIVATE/" + name); URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
when(rsrc.getResource()).thenReturn(uri); BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
return rsrc; return rsrc;
} }
private static LocalResource getAppMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.APPLICATION);
}
private static LocalResource getPublicMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PUBLIC);
}
private static LocalResource getPrivateMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PRIVATE);
}
static Container getMockContainer(ApplicationId appId, int id) { private static Container getMockContainer(ApplicationId appId, int id) {
Container c = mock(Container.class); Container c = mock(Container.class);
ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class); ApplicationAttemptId appAttemptId =
appAttemptId.setApplicationId(appId); BuilderUtils.newApplicationAttemptId(appId, 1);
appAttemptId.setAttemptId(1); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
ContainerId cId = Records.newRecord(ContainerId.class);
cId.setAppAttemptId(appAttemptId);
cId.setAppId(appId);
cId.setId(id);
when(c.getUser()).thenReturn("user0"); when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId); when(c.getContainerID()).thenReturn(cId);
Credentials creds = new Credentials(); Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id)); creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds); when(c.getCredentials()).thenReturn(creds);
when(c.toString()).thenReturn(cId.toString());
return c; return c;
} }