YARN-583. Moved application level local resources to be localized under the filecache sub-directory under application directory. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1ee2145cf
commit
ffd7dbb34a
|
@ -167,6 +167,10 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-542. Changed the default global AM max-attempts value to be not one.
|
||||
(Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-583. Moved application level local resources to be localized under the
|
||||
filecache sub-directory under application directory. (Omkar Vinit Joshi via
|
||||
vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -481,18 +482,15 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
|
||||
private String getUserFileCachePath(String user) {
|
||||
String path =
|
||||
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
|
||||
+ user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
|
||||
return path;
|
||||
return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
|
||||
ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));
|
||||
|
||||
}
|
||||
|
||||
private String getUserAppCachePath(String user, String appId) {
|
||||
String path =
|
||||
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
|
||||
+ user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
|
||||
+ Path.SEPARATOR + appId;
|
||||
return path;
|
||||
private String getAppFileCachePath(String user, String appId) {
|
||||
return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
|
||||
ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
|
||||
ContainerLocalizer.FILECACHE));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -942,7 +940,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
|
||||
cacheDirectory = getUserFileCachePath(user);
|
||||
} else {// APPLICATION ONLY
|
||||
cacheDirectory = getUserAppCachePath(user, appId.toString());
|
||||
cacheDirectory = getAppFileCachePath(user, appId.toString());
|
||||
}
|
||||
Path dirPath =
|
||||
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
@ -72,6 +73,7 @@ import org.apache.hadoop.security.Credentials;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
|
||||
|
@ -848,6 +851,163 @@ public class TestResourceLocalizationService {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testLocalResourcePath() throws Exception {
|
||||
|
||||
// test the local path where application and user cache files will be
|
||||
// localized.
|
||||
|
||||
DrainDispatcher dispatcher1 = null;
|
||||
try {
|
||||
dispatcher1 = new DrainDispatcher();
|
||||
String user = "testuser";
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||
|
||||
// mocked Resource Localization Service
|
||||
Configuration conf = new Configuration();
|
||||
AbstractFileSystem spylfs =
|
||||
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
||||
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
||||
// We don't want files to be created
|
||||
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
|
||||
anyBoolean());
|
||||
|
||||
// creating one local directory
|
||||
List<Path> localDirs = new ArrayList<Path>();
|
||||
String[] sDirs = new String[1];
|
||||
for (int i = 0; i < 1; ++i) {
|
||||
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
||||
sDirs[i] = localDirs.get(i).toString();
|
||||
}
|
||||
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||
// setting log directory.
|
||||
String logDir =
|
||||
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
|
||||
|
||||
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
|
||||
localDirHandler.init(conf);
|
||||
// Registering event handlers
|
||||
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
||||
dispatcher1.register(ApplicationEventType.class, applicationBus);
|
||||
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
|
||||
dispatcher1.register(ContainerEventType.class, containerBus);
|
||||
|
||||
ContainerExecutor exec = mock(ContainerExecutor.class);
|
||||
DeletionService delService = mock(DeletionService.class);
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
// initializing directory handler.
|
||||
dirsHandler.init(conf);
|
||||
|
||||
dispatcher1.init(conf);
|
||||
dispatcher1.start();
|
||||
|
||||
ResourceLocalizationService rls =
|
||||
new ResourceLocalizationService(dispatcher1, exec, delService,
|
||||
localDirHandler);
|
||||
dispatcher1.register(LocalizationEventType.class, rls);
|
||||
rls.init(conf);
|
||||
|
||||
rls.handle(createApplicationLocalizationEvent(user, appId));
|
||||
|
||||
// We need to pre-populate the LocalizerRunner as the
|
||||
// Resource Localization Service code internally starts them which
|
||||
// definitely we don't want.
|
||||
|
||||
// creating new container and populating corresponding localizer runner
|
||||
|
||||
// Container - 1
|
||||
Container container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainerID().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container1
|
||||
.getContainerID(), null), localizerId1));
|
||||
|
||||
// Creating two requests for container
|
||||
// 1) Private resource
|
||||
// 2) Application resource
|
||||
LocalResourceRequest reqPriv =
|
||||
new LocalResourceRequest(new Path("file:///tmp1"), 123L,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
|
||||
List<LocalResourceRequest> privList =
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
privList.add(reqPriv);
|
||||
|
||||
LocalResourceRequest reqApp =
|
||||
new LocalResourceRequest(new Path("file:///tmp2"), 123L,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
|
||||
List<LocalResourceRequest> appList =
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
appList.add(reqApp);
|
||||
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
|
||||
rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
|
||||
rsrcs.put(LocalResourceVisibility.PRIVATE, privList);
|
||||
|
||||
dispatcher1.getEventHandler().handle(
|
||||
new ContainerLocalizationRequestEvent(container1, rsrcs));
|
||||
|
||||
// Now waiting for resource download to start. Here actual will not start
|
||||
// Only the resources will be populated into pending list.
|
||||
Assert
|
||||
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500));
|
||||
|
||||
// Validating user and application cache paths
|
||||
|
||||
String userCachePath =
|
||||
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
|
||||
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
|
||||
ContainerLocalizer.FILECACHE));
|
||||
String userAppCachePath =
|
||||
StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
|
||||
.toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
|
||||
ContainerLocalizer.APPCACHE, appId.toString(),
|
||||
ContainerLocalizer.FILECACHE));
|
||||
|
||||
// Now the Application and private resources may come in any order
|
||||
// for download.
|
||||
// For User cahce :
|
||||
// returned destinationPath = user cache path + random number
|
||||
// For App cache :
|
||||
// returned destinationPath = user app cache path + random number
|
||||
|
||||
int returnedResources = 0;
|
||||
boolean appRsrc = false, privRsrc = false;
|
||||
while (returnedResources < 2) {
|
||||
LocalizerHeartbeatResponse response =
|
||||
rls.heartbeat(createLocalizerStatus(localizerId1));
|
||||
for (ResourceLocalizationSpec resourceSpec : response
|
||||
.getResourceSpecs()) {
|
||||
returnedResources++;
|
||||
Path destinationDirectory =
|
||||
new Path(resourceSpec.getDestinationDirectory().getFile());
|
||||
if (resourceSpec.getResource().getVisibility() ==
|
||||
LocalResourceVisibility.APPLICATION) {
|
||||
appRsrc = true;
|
||||
Assert.assertEquals(userAppCachePath, destinationDirectory
|
||||
.getParent().toUri().toString());
|
||||
} else if (resourceSpec.getResource().getVisibility() ==
|
||||
LocalResourceVisibility.PRIVATE) {
|
||||
privRsrc = true;
|
||||
Assert.assertEquals(userCachePath, destinationDirectory.getParent()
|
||||
.toUri().toString());
|
||||
} else {
|
||||
throw new Exception("Unexpected resource recevied.");
|
||||
}
|
||||
}
|
||||
}
|
||||
// We should receive both the resources (Application and Private)
|
||||
Assert.assertTrue(appRsrc && privRsrc);
|
||||
} finally {
|
||||
if (dispatcher1 != null) {
|
||||
dispatcher1.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private LocalizerStatus createLocalizerStatusForFailedResource(
|
||||
String localizerId, LocalResourceRequest req) {
|
||||
LocalizerStatus status = createLocalizerStatus(localizerId);
|
||||
|
|
Loading…
Reference in New Issue