YARN-583. Moved application level local resources to be localized under the filecache sub-directory under application directory. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1470812 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1470813 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-23 04:38:45 +00:00
parent 778b64e29f
commit 23f099da0b
3 changed files with 174 additions and 12 deletions

View File

@ -96,6 +96,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-542. Changed the default global AM max-attempts value to be not one. YARN-542. Changed the default global AM max-attempts value to be not one.
(Zhijie Shen via vinodkv) (Zhijie Shen via vinodkv)
YARN-583. Moved application level local resources to be localized under the
filecache sub-directory under application directory. (Omkar Vinit Joshi via
vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -481,18 +482,15 @@ public class ResourceLocalizationService extends CompositeService
} }
private String getUserFileCachePath(String user) { private String getUserFileCachePath(String user) {
String path = return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));
+ user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
return path;
} }
private String getUserAppCachePath(String user, String appId) { private String getAppFileCachePath(String user, String appId) {
String path = return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+ user + Path.SEPARATOR + ContainerLocalizer.APPCACHE ContainerLocalizer.FILECACHE));
+ Path.SEPARATOR + appId;
return path;
} }
@VisibleForTesting @VisibleForTesting
@ -942,7 +940,7 @@ public class ResourceLocalizationService extends CompositeService
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
cacheDirectory = getUserFileCachePath(user); cacheDirectory = getUserFileCachePath(user);
} else {// APPLICATION ONLY } else {// APPLICATION ONLY
cacheDirectory = getUserAppCachePath(user, appId.toString()); cacheDirectory = getAppFileCachePath(user, appId.toString());
} }
Path dirPath = Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory, dirsHandler.getLocalPathForWrite(cacheDirectory,

View File

@ -42,6 +42,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -72,6 +73,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -848,6 +851,163 @@ public class TestResourceLocalizationService {
} }
} }
@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void testLocalResourcePath() throws Exception {
// test the local path where application and user cache files will be
// localized.
DrainDispatcher dispatcher1 = null;
try {
dispatcher1 = new DrainDispatcher();
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
// mocked Resource Localization Service
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
rls.handle(createApplicationLocalizationEvent(user, appId));
// We need to pre-populate the LocalizerRunner as the
// Resource Localization Service code internally starts them which
// definitely we don't want.
// creating new container and populating corresponding localizer runner
// Container - 1
Container container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString();
rls.getPrivateLocalizers().put(
localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1));
// Creating two requests for container
// 1) Private resource
// 2) Application resource
LocalResourceRequest reqPriv =
new LocalResourceRequest(new Path("file:///tmp1"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
List<LocalResourceRequest> privList =
new ArrayList<LocalResourceRequest>();
privList.add(reqPriv);
LocalResourceRequest reqApp =
new LocalResourceRequest(new Path("file:///tmp2"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
List<LocalResourceRequest> appList =
new ArrayList<LocalResourceRequest>();
appList.add(reqApp);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
rsrcs.put(LocalResourceVisibility.PRIVATE, privList);
dispatcher1.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container1, rsrcs));
// Now waiting for resource download to start. Here actual will not start
// Only the resources will be populated into pending list.
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500));
// Validating user and application cache paths
String userCachePath =
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
ContainerLocalizer.FILECACHE));
String userAppCachePath =
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
ContainerLocalizer.APPCACHE, appId.toString(),
ContainerLocalizer.FILECACHE));
// Now the Application and private resources may come in any order
// for download.
// For User cahce :
// returned destinationPath = user cache path + random number
// For App cache :
// returned destinationPath = user app cache path + random number
int returnedResources = 0;
boolean appRsrc = false, privRsrc = false;
while (returnedResources < 2) {
LocalizerHeartbeatResponse response =
rls.heartbeat(createLocalizerStatus(localizerId1));
for (ResourceLocalizationSpec resourceSpec : response
.getResourceSpecs()) {
returnedResources++;
Path destinationDirectory =
new Path(resourceSpec.getDestinationDirectory().getFile());
if (resourceSpec.getResource().getVisibility() ==
LocalResourceVisibility.APPLICATION) {
appRsrc = true;
Assert.assertEquals(userAppCachePath, destinationDirectory
.getParent().toUri().toString());
} else if (resourceSpec.getResource().getVisibility() ==
LocalResourceVisibility.PRIVATE) {
privRsrc = true;
Assert.assertEquals(userCachePath, destinationDirectory.getParent()
.toUri().toString());
} else {
throw new Exception("Unexpected resource recevied.");
}
}
}
// We should receive both the resources (Application and Private)
Assert.assertTrue(appRsrc && privRsrc);
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private LocalizerStatus createLocalizerStatusForFailedResource( private LocalizerStatus createLocalizerStatusForFailedResource(
String localizerId, LocalResourceRequest req) { String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId); LocalizerStatus status = createLocalizerStatus(localizerId);