diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bfcf8e55f1a..2a776b094ba 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,10 @@ Release 2.0.5-beta - UNRELEASED YARN-542. Changed the default global AM max-attempts value to be not one. (Zhijie Shen via vinodkv) + YARN-583. Moved application level local resources to be localized under the + filecache sub-directory under application directory. (Omkar Vinit Joshi via + vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index c852de72527..0d1d9228719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -481,18 +482,15 @@ public class ResourceLocalizationService extends CompositeService } private String getUserFileCachePath(String user) { - String path = - "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR - + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE; - return path; + return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE)); + } - private String getUserAppCachePath(String user, String appId) { - String path = - "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR - + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE - + Path.SEPARATOR + appId; - return path; + private String getAppFileCachePath(String user, String appId) { + return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, + ContainerLocalizer.FILECACHE)); } @VisibleForTesting @@ -942,7 +940,7 @@ public class ResourceLocalizationService extends CompositeService if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); } else {// APPLICATION ONLY - cacheDirectory = getUserAppCachePath(user, appId.toString()); + cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 80ff72686a7..c4273bfffa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -72,6 +73,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -848,6 +851,163 @@ public class TestResourceLocalizationService { } } + @Test(timeout = 10000) + @SuppressWarnings("unchecked") + public void testLocalResourcePath() throws Exception { + + // test the local path where application and user cache files will be + // localized. + + DrainDispatcher dispatcher1 = null; + try { + dispatcher1 = new DrainDispatcher(); + String user = "testuser"; + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher1, exec, delService, + localDirHandler); + dispatcher1.register(LocalizationEventType.class, rls); + rls.init(conf); + + rls.handle(createApplicationLocalizationEvent(user, appId)); + + // We need to pre-populate the LocalizerRunner as the + // Resource Localization Service code internally starts them which + // definitely we don't want. + + // creating new container and populating corresponding localizer runner + + // Container - 1 + Container container1 = createMockContainer(user, 1); + String localizerId1 = container1.getContainerID().toString(); + rls.getPrivateLocalizers().put( + localizerId1, + rls.new LocalizerRunner(new LocalizerContext(user, container1 + .getContainerID(), null), localizerId1)); + + // Creating two requests for container + // 1) Private resource + // 2) Application resource + LocalResourceRequest reqPriv = + new LocalResourceRequest(new Path("file:///tmp1"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + List privList = + new ArrayList(); + privList.add(reqPriv); + + LocalResourceRequest reqApp = + new LocalResourceRequest(new Path("file:///tmp2"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, ""); + List appList = + new ArrayList(); + appList.add(reqApp); + + Map> rsrcs = + new HashMap>(); + rsrcs.put(LocalResourceVisibility.APPLICATION, appList); + rsrcs.put(LocalResourceVisibility.PRIVATE, privList); + + dispatcher1.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container1, rsrcs)); + + // Now waiting for resource download to start. Here actual will not start + // Only the resources will be populated into pending list. + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500)); + + // Validating user and application cache paths + + String userCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.FILECACHE)); + String userAppCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.APPCACHE, appId.toString(), + ContainerLocalizer.FILECACHE)); + + // Now the Application and private resources may come in any order + // for download. + // For User cahce : + // returned destinationPath = user cache path + random number + // For App cache : + // returned destinationPath = user app cache path + random number + + int returnedResources = 0; + boolean appRsrc = false, privRsrc = false; + while (returnedResources < 2) { + LocalizerHeartbeatResponse response = + rls.heartbeat(createLocalizerStatus(localizerId1)); + for (ResourceLocalizationSpec resourceSpec : response + .getResourceSpecs()) { + returnedResources++; + Path destinationDirectory = + new Path(resourceSpec.getDestinationDirectory().getFile()); + if (resourceSpec.getResource().getVisibility() == + LocalResourceVisibility.APPLICATION) { + appRsrc = true; + Assert.assertEquals(userAppCachePath, destinationDirectory + .getParent().toUri().toString()); + } else if (resourceSpec.getResource().getVisibility() == + LocalResourceVisibility.PRIVATE) { + privRsrc = true; + Assert.assertEquals(userCachePath, destinationDirectory.getParent() + .toUri().toString()); + } else { + throw new Exception("Unexpected resource recevied."); + } + } + } + // We should receive both the resources (Application and Private) + Assert.assertTrue(appRsrc && privRsrc); + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + } + private LocalizerStatus createLocalizerStatusForFailedResource( String localizerId, LocalResourceRequest req) { LocalizerStatus status = createLocalizerStatus(localizerId);