diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d602ceb0afb..90f8ee10d27 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1252,6 +1252,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList (jobs) (Thomas Graves via mahadev) + MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources + and related tests. (Siddharth Seth via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 7d9211b8bc5..ac921eb8b2f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -711,7 +711,7 @@ public abstract class TaskAttemptImpl implements String linkName = name.toUri().getPath(); container.setLocalResource( linkName, - BuilderUtils.newLocalResource(recordFactory, + BuilderUtils.newLocalResource( p.toUri(), type, visibilities[i] ? LocalResourceVisibility.PUBLIC diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 1afe4640613..cfff2fde827 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -86,12 +87,11 @@ public class BuilderUtils { } } - public static LocalResource newLocalResource(RecordFactory recordFactory, - URI uri, LocalResourceType type, LocalResourceVisibility visibility, - long size, long timestamp) { + public static LocalResource newLocalResource(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp) { LocalResource resource = - recordFactory.newRecordInstance(LocalResource.class); - resource.setResource(ConverterUtils.getYarnUrlFromURI(uri)); + recordFactory.newRecordInstance(LocalResource.class); + resource.setResource(url); resource.setType(type); resource.setVisibility(visibility); resource.setSize(size); @@ -99,6 +99,13 @@ public class BuilderUtils { return resource; } + public static LocalResource newLocalResource(URI uri, + LocalResourceType type, LocalResourceVisibility visibility, long size, + long timestamp) { + return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type, + visibility, size, timestamp); + } + public static ApplicationId newApplicationId(RecordFactory recordFactory, long clustertimestamp, CharSequence id) { ApplicationId applicationId = @@ -125,6 +132,15 @@ public class BuilderUtils { return applicationId; } + public static ApplicationAttemptId newApplicationAttemptId( + ApplicationId appId, int attemptId) { + ApplicationAttemptId appAttemptId = + recordFactory.newRecordInstance(ApplicationAttemptId.class); + appAttemptId.setApplicationId(appId); + appAttemptId.setAttemptId(attemptId); + return appAttemptId; + } + public static ApplicationId convert(long clustertimestamp, CharSequence id) { ApplicationId applicationId = recordFactory.newRecordInstance(ApplicationId.class); @@ -133,6 +149,24 @@ public class BuilderUtils { return applicationId; } + public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, + int containerId) { + ContainerId id = recordFactory.newRecordInstance(ContainerId.class); + id.setAppId(appAttemptId.getApplicationId()); + id.setId(containerId); + id.setAppAttemptId(appAttemptId); + return id; + } + + public static ContainerId newContainerId(int appId, int appAttemptId, + long timestamp, int id) { + ApplicationId applicationId = newApplicationId(timestamp, appId); + ApplicationAttemptId applicationAttemptId = newApplicationAttemptId( + applicationId, appAttemptId); + ContainerId cId = newContainerId(applicationAttemptId, id); + return cId; + } + public static ContainerId newContainerId(RecordFactory recordFactory, ApplicationId appId, ApplicationAttemptId appAttemptId, int containerId) { @@ -227,4 +261,20 @@ public class BuilderUtils { report.setStartTime(startTime); return report; } + + public static Resource newResource(int memory) { + Resource resource = recordFactory.newRecordInstance(Resource.class); + resource.setMemory(memory); + return resource; + } + + public static URL newURL(String scheme, String host, int port, String file) { + URL url = recordFactory.newRecordInstance(URL.class); + url.setScheme(scheme); + url.setHost(host); + url.setPort(port); + url.setFile(file); + return url; + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 8a4439d32b0..46d5a526177 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -81,6 +84,12 @@ public class ContainerImpl implements Container { new HashMap(); private final Map localizedResources = new HashMap(); + private final List publicRsrcs = + new ArrayList(); + private final List privateRsrcs = + new ArrayList(); + private final List appRsrcs = + new ArrayList(); public ContainerImpl(Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -361,7 +370,7 @@ public class ContainerImpl implements Container { } } - @SuppressWarnings("fallthrough") + @SuppressWarnings({"fallthrough", "unchecked"}) private void finished() { switch (getContainerState()) { case EXITED_WITH_SUCCESS: @@ -404,6 +413,24 @@ public class ContainerImpl implements Container { containerID, exitCode)); } + @SuppressWarnings("unchecked") // dispatcher not typed + public void cleanup() { + Map> rsrc = + new HashMap>(); + if (!publicRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs); + } + if (!privateRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs); + } + if (!appRsrcs.isEmpty()) { + rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs); + } + dispatcher.getEventHandler().handle( + new ContainerLocalizationCleanupEvent(this, rsrc)); + } + static class ContainerTransition implements SingleArcTransition { @@ -439,12 +466,6 @@ public class ContainerImpl implements Container { // Send requests for public, private resources Map cntrRsrc = ctxt.getAllLocalResources(); if (!cntrRsrc.isEmpty()) { - ArrayList publicRsrc = - new ArrayList(); - ArrayList privateRsrc = - new ArrayList(); - ArrayList appRsrc = - new ArrayList(); try { for (Map.Entry rsrc : cntrRsrc.entrySet()) { try { @@ -453,13 +474,13 @@ public class ContainerImpl implements Container { container.pendingResources.put(req, rsrc.getKey()); switch (rsrc.getValue().getVisibility()) { case PUBLIC: - publicRsrc.add(req); + container.publicRsrcs.add(req); break; case PRIVATE: - privateRsrc.add(req); + container.privateRsrcs.add(req); break; case APPLICATION: - appRsrc.add(req); + container.appRsrcs.add(req); break; } } catch (URISyntaxException e) { @@ -471,27 +492,25 @@ public class ContainerImpl implements Container { } catch (URISyntaxException e) { // malformed resource; abort container launch LOG.warn("Failed to parse resource-request", e); - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZATION_FAILED; } - if (!publicRsrc.isEmpty()) { - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationRequestEvent( - container, publicRsrc, LocalResourceVisibility.PUBLIC)); + Map> req = + new HashMap>(); + if (!container.publicRsrcs.isEmpty()) { + req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs); } - if (!privateRsrc.isEmpty()) { - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationRequestEvent( - container, privateRsrc, LocalResourceVisibility.PRIVATE)); + if (!container.privateRsrcs.isEmpty()) { + req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs); } - if (!appRsrc.isEmpty()) { - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationRequestEvent( - container, appRsrc, LocalResourceVisibility.APPLICATION)); + if (!container.appRsrcs.isEmpty()) { + req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs); } + + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { container.dispatcher.getEventHandler().handle( @@ -546,7 +565,6 @@ public class ContainerImpl implements Container { } } - @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithSuccessTransition extends ContainerTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { @@ -554,13 +572,10 @@ public class ContainerImpl implements Container { // Inform the localizer to decrement reference counts and cleanup // resources. - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); } } - @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithFailureTransition extends ContainerTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { @@ -572,13 +587,10 @@ public class ContainerImpl implements Container { // Inform the localizer to decrement reference counts and cleanup // resources. - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); } } - @SuppressWarnings("unchecked") // dispatcher not typed static class ResourceFailedTransition implements SingleArcTransition { @Override @@ -592,30 +604,24 @@ public class ContainerImpl implements Container { // Inform the localizer to decrement reference counts and cleanup // resources. - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); container.metrics.endInitingContainer(); } } - @SuppressWarnings("unchecked") // dispatcher not typed static class KillDuringLocalizationTransition implements SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { // Inform the localizer to decrement reference counts and cleanup // resources. - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); container.metrics.endInitingContainer(); ContainerKillEvent killEvent = (ContainerKillEvent) event; container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); } } - @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedResourceDuringKillTransition implements SingleArcTransition { @Override @@ -647,7 +653,6 @@ public class ContainerImpl implements Container { } } - @SuppressWarnings("unchecked") // dispatcher not typed static class ContainerKilledTransition implements SingleArcTransition { @Override @@ -657,13 +662,10 @@ public class ContainerImpl implements Container { // The process/process-grp is killed. Decrement reference counts and // cleanup resources - container.dispatcher.getEventHandler().handle( - new ContainerLocalizationEvent( - LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container)); + container.cleanup(); } } - @SuppressWarnings("unchecked") // dispatcher not typed static class ContainerDoneTransition implements SingleArcTransition { @Override @@ -697,7 +699,8 @@ public class ContainerImpl implements Container { newState = stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.warn("Can't handle this event at current state", e); + LOG.warn("Can't handle this event at current state: Current: [" + + oldState + "], eventType: [" + event.getType() + "]", e); } if (oldState != newState) { LOG.info("Container " + containerID + " transitioned from " diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 9dae769515a..fc302c2481a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; @@ -274,7 +273,7 @@ public class ContainerLocalizer { stat.setLocalPath( ConverterUtils.getYarnUrlFromPath(localPath)); stat.setLocalSize( - FileUtil.getDU(new File(localPath.getParent().toString()))); + FileUtil.getDU(new File(localPath.getParent().toUri()))); stat.setStatus(ResourceStatusType.FETCH_SUCCESS); } catch (ExecutionException e) { stat.setStatus(ResourceStatusType.FETCH_FAILURE); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 283c6d4f43d..017431501f8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even * {@link LocalResourceVisibility}. * */ + class LocalResourcesTrackerImpl implements LocalResourcesTracker { static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); @@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { @Override public boolean remove(LocalizedResource rem, DeletionService delService) { // current synchronization guaranteed by crude RLS event for cleanup - LocalizedResource rsrc = localrsrc.remove(rem.getRequest()); + LocalizedResource rsrc = localrsrc.get(rem.getRequest()); if (null == rsrc) { LOG.error("Attempt to remove absent resource: " + rem.getRequest() + " from " + getUser()); @@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) { // internal error - LOG.error("Attempt to remove resource with non-zero refcount"); + LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount"); assert false; return false; } + localrsrc.remove(rem.getRequest()); if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), rsrc.getLocalPath()); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index 2ba25bfaf4a..1e02fe68980 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -120,7 +120,8 @@ public class LocalizedResource implements EventHandler { for (ContainerId c : ref) { sb.append("(").append(c.toString()).append(")"); } - sb.append("],").append(getTimestamp()).append("}"); + sb.append("],").append(getTimestamp()).append(",") + .append(getState()).append("}"); return sb.toString(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b74a0cb29e4..083116a51d0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -22,6 +22,7 @@ import java.io.File; import java.net.URISyntaxException; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -63,7 +65,6 @@ import org.apache.avro.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -93,7 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -101,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; @@ -198,7 +200,7 @@ public class ResourceLocalizationService extends AbstractService conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS); localizationServerAddress = NetUtils.createSocketAddr( conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS)); - localizerTracker = new LocalizerTracker(conf); + localizerTracker = createLocalizerTracker(conf); dispatcher.register(LocalizerEventType.class, localizerTracker); cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); @@ -218,6 +220,10 @@ public class ResourceLocalizationService extends AbstractService super.start(); } + LocalizerTracker createLocalizerTracker(Configuration conf) { + return new LocalizerTracker(conf); + } + Server createServer() { YarnRPC rpc = YarnRPC.create(getConfig()); Configuration conf = new Configuration(getConfig()); // Clone to separate @@ -252,6 +258,9 @@ public class ResourceLocalizationService extends AbstractService public void handle(LocalizationEvent event) { String userName; String appIDStr; + Container c; + Map> rsrcs; + LocalResourcesTracker tracker; // TODO: create log dir as $logdir/$user/$appId switch (event.getType()) { case INIT_APPLICATION_RESOURCES: @@ -276,28 +285,16 @@ public class ResourceLocalizationService extends AbstractService case INIT_CONTAINER_RESOURCES: ContainerLocalizationRequestEvent rsrcReqs = (ContainerLocalizationRequestEvent) event; - Container c = rsrcReqs.getContainer(); + c = rsrcReqs.getContainer(); LocalizerContext ctxt = new LocalizerContext( c.getUser(), c.getContainerID(), c.getCredentials()); - final LocalResourcesTracker tracker; - LocalResourceVisibility vis = rsrcReqs.getVisibility(); - switch (vis) { - default: - case PUBLIC: - tracker = publicRsrc; - break; - case PRIVATE: - tracker = privateRsrc.get(c.getUser()); - break; - case APPLICATION: - tracker = - appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId())); - break; - } - // We get separate events one each for all resources of one visibility. So - // all the resources in this event are of the same visibility. - for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) { - tracker.handle(new ResourceRequestEvent(req, vis, ctxt)); + rsrcs = rsrcReqs.getRequestedResources(); + for (LocalResourceVisibility vis : rsrcs.keySet()) { + tracker = getLocalResourcesTracker(vis, c.getUser(), + c.getContainerID().getAppId()); + for (LocalResourceRequest req : rsrcs.get(vis)) { + tracker.handle(new ResourceRequestEvent(req, vis, ctxt)); + } } break; case CACHE_CLEANUP: @@ -311,14 +308,23 @@ public class ResourceLocalizationService extends AbstractService } break; case CLEANUP_CONTAINER_RESOURCES: - Container container = - ((ContainerLocalizationEvent)event).getContainer(); + ContainerLocalizationCleanupEvent rsrcCleanup = + (ContainerLocalizationCleanupEvent) event; + c = rsrcCleanup.getContainer(); + rsrcs = rsrcCleanup.getResources(); + for (LocalResourceVisibility vis : rsrcs.keySet()) { + tracker = getLocalResourcesTracker(vis, c.getUser(), + c.getContainerID().getAppId()); + for (LocalResourceRequest req : rsrcs.get(vis)) { + tracker.handle(new ResourceReleaseEvent(req, c.getContainerID())); + } + } // Delete the container directories - userName = container.getUser(); - String containerIDStr = container.toString(); + userName = c.getUser(); + String containerIDStr = c.toString(); appIDStr = - ConverterUtils.toString(container.getContainerID().getAppId()); + ConverterUtils.toString(c.getContainerID().getAppId()); for (Path localDir : localDirs) { // Delete the user-owned container-dir @@ -336,8 +342,7 @@ public class ResourceLocalizationService extends AbstractService delService.delete(null, containerSysDir, new Path[] {}); } - dispatcher.getEventHandler().handle(new ContainerEvent( - container.getContainerID(), + dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(), ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); break; case DESTROY_APPLICATION_RESOURCES: @@ -379,6 +384,19 @@ public class ResourceLocalizationService extends AbstractService } } + LocalResourcesTracker getLocalResourcesTracker( + LocalResourceVisibility visibility, String user, ApplicationId appId) { + switch (visibility) { + default: + case PUBLIC: + return publicRsrc; + case PRIVATE: + return privateRsrc.get(user); + case APPLICATION: + return appRsrc.get(ConverterUtils.toString(appId)); + } + } + /** * Sub-component handling the spawning of {@link ContainerLocalizer}s */ @@ -526,6 +544,7 @@ public class ResourceLocalizationService extends AbstractService } @Override + @SuppressWarnings("unchecked") // dispatcher not typed public void run() { try { // TODO shutdown, better error handling esp. DU @@ -651,6 +670,7 @@ public class ResourceLocalizationService extends AbstractService } // TODO this sucks. Fix it later + @SuppressWarnings("unchecked") // dispatcher not typed LocalizerHeartbeatResponse update( List remoteResourceStatuses) { LocalizerHeartbeatResponse response = @@ -795,6 +815,7 @@ public class ResourceLocalizationService extends AbstractService } @Override + @SuppressWarnings("unchecked") // dispatcher not typed public void run() { dispatcher.getEventHandler().handle( new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java new file mode 100644 index 00000000000..6120259e297 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java @@ -0,0 +1,49 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +public class ContainerLocalizationCleanupEvent extends + ContainerLocalizationEvent { + + private final Map> + rsrc; + + /** + * Event requesting the cleanup of the rsrc. + * @param c + * @param rsrc + */ + public ContainerLocalizationCleanupEvent(Container c, + Map> rsrc) { + super(LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, c); + this.rsrc = rsrc; + } + + public + Map> + getResources() { + return rsrc; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java index eeb735421dd..4cb2e5cd191 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; import java.util.Collection; +import java.util.Map; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Loca public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent { - private final LocalResourceVisibility vis; - private final Collection reqs; + private final Map> + rsrc; /** - * Event requesting the localization of the reqs all with visibility vis + * Event requesting the localization of the rsrc. * @param c - * @param reqs - * @param vis + * @param rsrc */ public ContainerLocalizationRequestEvent(Container c, - Collection reqs, LocalResourceVisibility vis) { + Map> rsrc) { super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c); - this.vis = vis; - this.reqs = reqs; + this.rsrc = rsrc; } - public LocalResourceVisibility getVisibility() { - return vis; + public + Map> + getRequestedResources() { + return rsrc; } - - public Collection getRequestedResources() { - return reqs; - } -} +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java index 4ab798282dd..3cdf1aa147f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; -import java.net.URISyntaxException; - import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; @@ -26,8 +24,8 @@ public class ResourceReleaseEvent extends ResourceEvent { private final ContainerId container; - public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container) - throws URISyntaxException { + public ResourceReleaseEvent(LocalResourceRequest rsrc, + ContainerId container) { super(rsrc, ResourceEventType.RELEASE); this.container = container; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 9ac6f2cfdd9..292dba61a42 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.junit.Assert.fail; +import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -79,14 +81,17 @@ public class DummyContainerManager extends ContainerManagerImpl { ContainerLocalizationRequestEvent rsrcReqs = (ContainerLocalizationRequestEvent) event; // simulate localization of all requested resources - for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) { - LOG.info("DEBUG: " + req + ":" + - rsrcReqs.getContainer().getContainerID()); - dispatcher.getEventHandler().handle( - new ContainerResourceLocalizedEvent( - rsrcReqs.getContainer().getContainerID(), req, - new Path("file:///local" + req.getPath().toUri().getPath()))); - } + for (Collection rc : rsrcReqs + .getRequestedResources().values()) { + for (LocalResourceRequest req : rc) { + LOG.info("DEBUG: " + req + ":" + + rsrcReqs.getContainer().getContainerID()); + dispatcher.getEventHandler().handle( + new ContainerResourceLocalizedEvent(rsrcReqs.getContainer() + .getContainerID(), req, new Path("file:///local" + + req.getPath().toUri().getPath()))); + } + } break; case CLEANUP_CONTAINER_RESOURCES: Container container = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2dd60b683fb..54dc85a98f1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -17,208 +17,203 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.net.URISyntaxException; - import java.nio.ByteBuffer; - +import java.util.AbstractMap.SimpleEntry; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Random; import java.util.Map.Entry; -import java.util.AbstractMap.SimpleEntry; +import java.util.Random; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; - +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; -import static org.junit.Assert.*; - import org.mockito.ArgumentMatcher; -import static org.mockito.Mockito.*; public class TestContainer { final NodeManagerMetrics metrics = NodeManagerMetrics.create(); + /** * Verify correct container request events sent to localizer. */ @Test - @SuppressWarnings("unchecked") // mocked generic public void testLocalizationRequest() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + WrappedContainer wc = null; try { - dispatcher.start(); - EventHandler localizerBus = mock(EventHandler.class); - dispatcher.register(LocalizationEventType.class, localizerBus); - // null serviceData; no registered AuxServicesEventType handler - - ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class); - ContainerId cId = getMockContainerId(7, 314159265358979L, 4344); - when(ctxt.getUser()).thenReturn("yak"); - when(ctxt.getContainerId()).thenReturn(cId); - - Random r = new Random(); - long seed = r.nextLong(); - r.setSeed(seed); - System.out.println("testLocalizationRequest seed: " + seed); - final Map localResources = createLocalResources(r); - when(ctxt.getAllLocalResources()).thenReturn(localResources); - - final Container c = newContainer(dispatcher, ctxt); - assertEquals(ContainerState.NEW, c.getContainerState()); + wc = new WrappedContainer(7, 314159265358979L, 4344, "yak"); + assertEquals(ContainerState.NEW, wc.c.getContainerState()); + wc.initContainer(); // Verify request for public/private resources to localizer - c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER)); - dispatcher.await(); - ContainerReqMatcher matchesPublicReq = - new ContainerReqMatcher(localResources, - EnumSet.of(LocalResourceVisibility.PUBLIC)); - ContainerReqMatcher matchesPrivateReq = - new ContainerReqMatcher(localResources, - EnumSet.of(LocalResourceVisibility.PRIVATE)); - ContainerReqMatcher matchesAppReq = - new ContainerReqMatcher(localResources, - EnumSet.of(LocalResourceVisibility.APPLICATION)); - verify(localizerBus).handle(argThat(matchesPublicReq)); - verify(localizerBus).handle(argThat(matchesPrivateReq)); - verify(localizerBus).handle(argThat(matchesAppReq)); - assertEquals(ContainerState.LOCALIZING, c.getContainerState()); - } finally { - dispatcher.stop(); + ResourcesRequestedMatcher matchesReq = + new ResourcesRequestedMatcher(wc.localResources, EnumSet.of( + LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE, + LocalResourceVisibility.APPLICATION)); + verify(wc.localizerBus).handle(argThat(matchesReq)); + assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState()); } + finally { + if (wc != null) { + wc.finished(); + } + } } /** * Verify container launch when all resources already cached. */ @Test - @SuppressWarnings("unchecked") // mocked generic public void testLocalizationLaunch() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + WrappedContainer wc = null; try { - dispatcher.start(); - EventHandler localizerBus = mock(EventHandler.class); - dispatcher.register(LocalizationEventType.class, localizerBus); - EventHandler launcherBus = - mock(EventHandler.class); - dispatcher.register(ContainersLauncherEventType.class, launcherBus); - // null serviceData; no registered AuxServicesEventType handler - - ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class); - ContainerId cId = getMockContainerId(8, 314159265358979L, 4344); - when(ctxt.getUser()).thenReturn("yak"); - when(ctxt.getContainerId()).thenReturn(cId); - - Random r = new Random(); - long seed = r.nextLong(); - r.setSeed(seed); - System.out.println("testLocalizationLaunch seed: " + seed); - final Map localResources = createLocalResources(r); - when(ctxt.getAllLocalResources()).thenReturn(localResources); - final Container c = newContainer(dispatcher, ctxt); - assertEquals(ContainerState.NEW, c.getContainerState()); - - c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER)); - dispatcher.await(); - - // Container prepared for localization events - Path cache = new Path("file:///cache"); - Map localPaths = new HashMap(); - for (Entry rsrc : localResources.entrySet()) { - assertEquals(ContainerState.LOCALIZING, c.getContainerState()); - LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); - Path p = new Path(cache, rsrc.getKey()); - localPaths.put(p, rsrc.getKey()); - // rsrc copied to p - c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p)); - } - dispatcher.await(); + wc = new WrappedContainer(8, 314159265358979L, 4344, "yak"); + assertEquals(ContainerState.NEW, wc.c.getContainerState()); + wc.initContainer(); + Map localPaths = wc.localizeResources(); // all resources should be localized - assertEquals(ContainerState.LOCALIZED, c.getContainerState()); - for (Entry loc : c.getLocalizedResources().entrySet()) { + assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + for (Entry loc : wc.c.getLocalizedResources().entrySet()) { assertEquals(localPaths.remove(loc.getKey()), loc.getValue()); } assertTrue(localPaths.isEmpty()); + final WrappedContainer wcf = wc; // verify container launch ArgumentMatcher matchesContainerLaunch = new ArgumentMatcher() { @Override public boolean matches(Object o) { ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o; - return c == launchEvent.getContainer(); + return wcf.c == launchEvent.getContainer(); } }; - verify(launcherBus).handle(argThat(matchesContainerLaunch)); + verify(wc.launcherBus).handle(argThat(matchesContainerLaunch)); } finally { - dispatcher.stop(); + if (wc != null) { + wc.finished(); + } } } + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testCleanupOnFailure() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(10, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + reset(wc.localizerBus); + wc.containerFailed(ExitCode.KILLED.getExitCode()); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + verifyCleanupCall(wc); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testCleanupOnSuccess() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(11, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + reset(wc.localizerBus); + wc.containerSuccessful(); + assertEquals(ContainerState.EXITED_WITH_SUCCESS, + wc.c.getContainerState()); + + verifyCleanupCall(wc); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testCleanupOnKillRequest() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(12, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + reset(wc.localizerBus); + wc.killContainer(); + assertEquals(ContainerState.KILLING, wc.c.getContainerState()); + wc.containerKilledOnRequest(); + + verifyCleanupCall(wc); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + /** * Verify serviceData correctly sent. */ @Test - @SuppressWarnings("unchecked") // mocked generic public void testServiceData() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); - dispatcher.start(); + WrappedContainer wc = null; try { - EventHandler localizerBus = mock(EventHandler.class); - dispatcher.register(LocalizationEventType.class, localizerBus); - EventHandler auxBus = mock(EventHandler.class); - dispatcher.register(AuxServicesEventType.class, auxBus); - EventHandler launchBus = mock(EventHandler.class); - dispatcher.register(ContainersLauncherEventType.class, launchBus); - - ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class); - final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344); - when(ctxt.getUser()).thenReturn("yak"); - when(ctxt.getContainerId()).thenReturn(cId); - when(ctxt.getAllLocalResources()).thenReturn( - Collections.emptyMap()); - - Random r = new Random(); - long seed = r.nextLong(); - r.setSeed(seed); - System.out.println("testServiceData seed: " + seed); - final Map serviceData = createServiceData(r); - when(ctxt.getAllServiceData()).thenReturn(serviceData); - - final Container c = newContainer(dispatcher, ctxt); - assertEquals(ContainerState.NEW, c.getContainerState()); - - // Verify propagation of service data to AuxServices - c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER)); - dispatcher.await(); - for (final Map.Entry e : serviceData.entrySet()) { + wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true); + assertEquals(ContainerState.NEW, wc.c.getContainerState()); + wc.initContainer(); + + for (final Map.Entry e : wc.serviceData.entrySet()) { ArgumentMatcher matchesServiceReq = new ArgumentMatcher() { @Override @@ -228,9 +223,10 @@ public class TestContainer { && 0 == e.getValue().compareTo(evt.getServiceData()); } }; - verify(auxBus).handle(argThat(matchesServiceReq)); + verify(wc.auxBus).handle(argThat(matchesServiceReq)); } + final WrappedContainer wcf = wc; // verify launch on empty resource request ArgumentMatcher matchesLaunchReq = new ArgumentMatcher() { @@ -238,61 +234,103 @@ public class TestContainer { public boolean matches(Object o) { ContainersLauncherEvent evt = (ContainersLauncherEvent) o; return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER - && cId == evt.getContainer().getContainerID(); + && wcf.cId == evt.getContainer().getContainerID(); } }; - verify(launchBus).handle(argThat(matchesLaunchReq)); + verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); } finally { - dispatcher.stop(); + if (wc != null) { + wc.finished(); + } } } - // Accept iff the resource request payload matches. - static class ContainerReqMatcher extends ArgumentMatcher { + private void verifyCleanupCall(WrappedContainer wc) throws Exception { + ResourcesReleasedMatcher matchesReq = + new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( + LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE, + LocalResourceVisibility.APPLICATION)); + verify(wc.localizerBus).handle(argThat(matchesReq)); + } + + private static class ResourcesReleasedMatcher extends + ArgumentMatcher { final HashSet resources = - new HashSet(); - ContainerReqMatcher(Map allResources, + new HashSet(); + + ResourcesReleasedMatcher(Map allResources, EnumSet vis) throws URISyntaxException { - for (Entry e : allResources.entrySet()) { + for (Entry e : allResources.entrySet()) { if (vis.contains(e.getValue().getVisibility())) { resources.add(new LocalResourceRequest(e.getValue())); } } } + @Override public boolean matches(Object o) { - ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o; + if (!(o instanceof ContainerLocalizationCleanupEvent)) { + return false; + } + ContainerLocalizationCleanupEvent evt = + (ContainerLocalizationCleanupEvent) o; final HashSet expected = - new HashSet(resources); - for (LocalResourceRequest rsrc : evt.getRequestedResources()) { - if (!expected.remove(rsrc)) { - return false; + new HashSet(resources); + for (Collection rc : evt.getResources().values()) { + for (LocalResourceRequest rsrc : rc) { + if (!expected.remove(rsrc)) { + return false; + } } } return expected.isEmpty(); } } - static Entry getMockRsrc(Random r, - LocalResourceVisibility vis) { - LocalResource rsrc = mock(LocalResource.class); + // Accept iff the resource payload matches. + private static class ResourcesRequestedMatcher extends + ArgumentMatcher { + final HashSet resources = + new HashSet(); - String name = Long.toHexString(r.nextLong()); - URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); - when(uri.getScheme()).thenReturn("file"); - when(uri.getHost()).thenReturn(null); - when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); + ResourcesRequestedMatcher(Map allResources, + EnumSet vis) throws URISyntaxException { + for (Entry e : allResources.entrySet()) { + if (vis.contains(e.getValue().getVisibility())) { + resources.add(new LocalResourceRequest(e.getValue())); + } + } + } - when(rsrc.getResource()).thenReturn(uri); - when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); - when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); - when(rsrc.getType()).thenReturn(LocalResourceType.FILE); - when(rsrc.getVisibility()).thenReturn(vis); - - return new SimpleEntry(name, rsrc); + @Override + public boolean matches(Object o) { + ContainerLocalizationRequestEvent evt = + (ContainerLocalizationRequestEvent) o; + final HashSet expected = + new HashSet(resources); + for (Collection rc : evt.getRequestedResources() + .values()) { + for (LocalResourceRequest rsrc : rc) { + if (!expected.remove(rsrc)) { + return false; + } + } + } + return expected.isEmpty(); + } } - static Map createLocalResources(Random r) { + private static Entry getMockRsrc(Random r, + LocalResourceVisibility vis) { + String name = Long.toHexString(r.nextLong()); + URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name); + LocalResource rsrc = + BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); + return new SimpleEntry(name, rsrc); + } + + private static Map createLocalResources(Random r) { Map localResources = new HashMap(); for (int i = r.nextInt(5) + 5; i >= 0; --i) { @@ -313,17 +351,7 @@ public class TestContainer { return localResources; } - static ContainerId getMockContainerId(int appId, long timestamp, int id) { - ApplicationId aId = mock(ApplicationId.class); - when(aId.getId()).thenReturn(appId); - when(aId.getClusterTimestamp()).thenReturn(timestamp); - ContainerId cId = mock(ContainerId.class); - when(cId.getId()).thenReturn(id); - when(cId.getAppId()).thenReturn(aId); - return cId; - } - - static Map createServiceData(Random r) { + private static Map createServiceData(Random r) { Map serviceData = new HashMap(); for (int i = r.nextInt(5) + 5; i >= 0; --i) { @@ -335,7 +363,134 @@ public class TestContainer { return serviceData; } - Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) { + private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) { return new ContainerImpl(disp, ctx, null, metrics); } + + @SuppressWarnings("unchecked") + private class WrappedContainer { + final DrainDispatcher dispatcher; + final EventHandler localizerBus; + final EventHandler launcherBus; + final EventHandler monitorBus; + final EventHandler auxBus; + + final ContainerLaunchContext ctxt; + final ContainerId cId; + final Container c; + final Map localResources; + final Map serviceData; + final String user; + + WrappedContainer(int appId, long timestamp, int id, String user) { + this(appId, timestamp, id, user, true, false); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData) { + dispatcher = new DrainDispatcher(); + dispatcher.init(null); + + localizerBus = mock(EventHandler.class); + launcherBus = mock(EventHandler.class); + monitorBus = mock(EventHandler.class); + auxBus = mock(EventHandler.class); + dispatcher.register(LocalizationEventType.class, localizerBus); + dispatcher.register(ContainersLauncherEventType.class, launcherBus); + dispatcher.register(ContainersMonitorEventType.class, monitorBus); + dispatcher.register(AuxServicesEventType.class, auxBus); + this.user = user; + + ctxt = mock(ContainerLaunchContext.class); + cId = BuilderUtils.newContainerId(appId, 1, timestamp, id); + when(ctxt.getUser()).thenReturn(this.user); + when(ctxt.getContainerId()).thenReturn(cId); + + Resource resource = BuilderUtils.newResource(1024); + when(ctxt.getResource()).thenReturn(resource); + + if (withLocalRes) { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("WrappedContainerLocalResource seed: " + seed); + localResources = createLocalResources(r); + } else { + localResources = Collections. emptyMap(); + } + when(ctxt.getAllLocalResources()).thenReturn(localResources); + + if (withServiceData) { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("ServiceData seed: " + seed); + serviceData = createServiceData(r); + } else { + serviceData = Collections. emptyMap(); + } + when(ctxt.getAllServiceData()).thenReturn(serviceData); + + c = newContainer(dispatcher, ctxt); + dispatcher.start(); + } + + private void drainDispatcherEvents() { + dispatcher.await(); + } + + public void finished() { + dispatcher.stop(); + } + + public void initContainer() { + c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER)); + drainDispatcherEvents(); + } + + public Map localizeResources() throws URISyntaxException { + Path cache = new Path("file:///cache"); + Map localPaths = new HashMap(); + for (Entry rsrc : localResources.entrySet()) { + assertEquals(ContainerState.LOCALIZING, c.getContainerState()); + LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); + Path p = new Path(cache, rsrc.getKey()); + localPaths.put(p, rsrc.getKey()); + // rsrc copied to p + c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), + req, p)); + } + drainDispatcherEvents(); + return localPaths; + } + + public void launchContainer() { + c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED)); + drainDispatcherEvents(); + } + + public void containerSuccessful() { + c.handle(new ContainerEvent(cId, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); + drainDispatcherEvents(); + } + + public void containerFailed(int exitCode) { + c.handle(new ContainerExitEvent(cId, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode)); + drainDispatcherEvents(); + } + + public void killContainer() { + c.handle(new ContainerKillEvent(cId, "KillRequest")); + drainDispatcherEvents(); + } + + public void containerKilledOnRequest() { + c.handle(new ContainerExitEvent(cId, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED + .getExitCode())); + drainDispatcherEvents(); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 2d542407e8f..b0174ad06ec 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -21,10 +21,17 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; + +import junit.framework.Assert; import org.apache.avro.ipc.Server; import org.apache.hadoop.conf.Configuration; @@ -63,11 +70,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; import org.junit.Test; import static org.junit.Assert.*; @@ -132,6 +143,190 @@ public class TestResourceLocalizationService { } } + @Test + @SuppressWarnings("unchecked") // mocked generics + public void testResourceRelease() throws Exception { + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + doNothing().when(spylfs).mkdir( + isA(Path.class), isA(FsPermission.class), anyBoolean()); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + Server ignore = mock(Server.class); + LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + //Ignore actual localization + EventHandler localizerBus = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = new DeletionService(exec); + delService.init(null); + delService.start(); + + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService); + ResourceLocalizationService spyService = spy(rawService); + doReturn(ignore).when(spyService).createServer(); + doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( + isA(Configuration.class)); + doReturn(lfs).when(spyService) + .getLocalFileContext(isA(Configuration.class)); + try { + spyService.init(conf); + spyService.start(); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES + LocalResourcesTracker appTracker = + spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user, appId); + LocalResourcesTracker privTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user, appId); + LocalResourcesTracker pubTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + + // init container. + final Container c = getMockContainer(appId, 42); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // Send localization requests for one resource of each type. + final LocalResource privResource = getPrivateMockedResource(r); + final LocalResourceRequest privReq = + new LocalResourceRequest(privResource); + + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + final LocalResource pubResource2 = getPublicMockedResource(r); + final LocalResourceRequest pubReq2 = + new LocalResourceRequest(pubResource2); + + final LocalResource appResource = getAppMockedResource(r); + final LocalResourceRequest appReq = new LocalResourceRequest(appResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + req.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq)); + req.put(LocalResourceVisibility.APPLICATION, + Collections.singletonList(appReq)); + + Map> req2 = + new HashMap>(); + req2.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + req2.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq2)); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq); + pubRsrcs.add(pubReq2); + + // Send Request event + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + spyService.handle(new ContainerLocalizationRequestEvent(c, req2)); + dispatcher.await(); + + int privRsrcCount = 0; + for (LocalizedResource lr : privTracker) { + privRsrcCount++; + Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount()); + Assert.assertEquals(privReq, lr.getRequest()); + } + Assert.assertEquals(1, privRsrcCount); + + int pubRsrcCount = 0; + for (LocalizedResource lr : pubTracker) { + pubRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + pubRsrcs.remove(lr.getRequest()); + } + Assert.assertEquals(0, pubRsrcs.size()); + Assert.assertEquals(2, pubRsrcCount); + + int appRsrcCount = 0; + for (LocalizedResource lr : appTracker) { + appRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(appReq, lr.getRequest()); + } + Assert.assertEquals(1, appRsrcCount); + + //Send Cleanup Event + spyService.handle(new ContainerLocalizationCleanupEvent(c, req)); + req2.remove(LocalResourceVisibility.PRIVATE); + spyService.handle(new ContainerLocalizationCleanupEvent(c, req2)); + dispatcher.await(); + + pubRsrcs.add(pubReq); + pubRsrcs.add(pubReq2); + + privRsrcCount = 0; + for (LocalizedResource lr : privTracker) { + privRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(privReq, lr.getRequest()); + } + Assert.assertEquals(1, privRsrcCount); + + pubRsrcCount = 0; + for (LocalizedResource lr : pubTracker) { + pubRsrcCount++; + Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount()); + pubRsrcs.remove(lr.getRequest()); + } + Assert.assertEquals(0, pubRsrcs.size()); + Assert.assertEquals(2, pubRsrcCount); + + appRsrcCount = 0; + for (LocalizedResource lr : appTracker) { + appRsrcCount++; + Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount()); + Assert.assertEquals(appReq, lr.getRequest()); + } + Assert.assertEquals(1, appRsrcCount); + } finally { + dispatcher.stop(); + delService.stop(); + } + } + @Test @SuppressWarnings("unchecked") // mocked generics public void testLocalizationHeartbeat() throws Exception { @@ -175,9 +370,8 @@ public class TestResourceLocalizationService { // init application final Application app = mock(Application.class); - final ApplicationId appId = mock(ApplicationId.class); - when(appId.getClusterTimestamp()).thenReturn(314159265358979L); - when(appId.getId()).thenReturn(3); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); when(app.getUser()).thenReturn("user0"); when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( @@ -205,11 +399,13 @@ public class TestResourceLocalizationService { doReturn(out).when(spylfs).createInternal(isA(Path.class), isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), anyLong(), isA(Progressable.class), anyInt(), anyBoolean()); - final LocalResource resource = getMockResource(r); + final LocalResource resource = getPrivateMockedResource(r); final LocalResourceRequest req = new LocalResourceRequest(resource); - spyService.handle(new ContainerLocalizationRequestEvent( - c, Collections.singletonList(req), - LocalResourceVisibility.PRIVATE)); + Map> rsrcs = + new HashMap>(); + rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req)); + spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); // Sigh. Thread init of private localizer not accessible Thread.sleep(500); dispatcher.await(); @@ -265,42 +461,44 @@ public class TestResourceLocalizationService { } } - static URL getPath(String path) { - URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); - when(uri.getScheme()).thenReturn("file"); - when(uri.getHost()).thenReturn(null); - when(uri.getFile()).thenReturn(path); - return uri; + private static URL getPath(String path) { + URL url = BuilderUtils.newURL("file", null, 0, path); + return url; } - static LocalResource getMockResource(Random r) { - LocalResource rsrc = mock(LocalResource.class); - + private static LocalResource getMockedResource(Random r, + LocalResourceVisibility vis) { String name = Long.toHexString(r.nextLong()); - URL uri = getPath("/local/PRIVATE/" + name); - - when(rsrc.getResource()).thenReturn(uri); - when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); - when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); - when(rsrc.getType()).thenReturn(LocalResourceType.FILE); - when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE); + URL url = getPath("/local/PRIVATE/" + name); + LocalResource rsrc = + BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); return rsrc; } + + private static LocalResource getAppMockedResource(Random r) { + return getMockedResource(r, LocalResourceVisibility.APPLICATION); + } + + private static LocalResource getPublicMockedResource(Random r) { + return getMockedResource(r, LocalResourceVisibility.PUBLIC); + } + + private static LocalResource getPrivateMockedResource(Random r) { + return getMockedResource(r, LocalResourceVisibility.PRIVATE); + } - static Container getMockContainer(ApplicationId appId, int id) { + private static Container getMockContainer(ApplicationId appId, int id) { Container c = mock(Container.class); - ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class); - appAttemptId.setApplicationId(appId); - appAttemptId.setAttemptId(1); - ContainerId cId = Records.newRecord(ContainerId.class); - cId.setAppAttemptId(appAttemptId); - cId.setAppId(appId); - cId.setId(id); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id); when(c.getUser()).thenReturn("user0"); when(c.getContainerID()).thenReturn(cId); Credentials creds = new Credentials(); creds.addToken(new Text("tok" + id), getToken(id)); when(c.getCredentials()).thenReturn(creds); + when(c.toString()).thenReturn(cId.toString()); return c; }