YARN-8672. Improve token filename management for localization.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-11-14 15:22:01 -05:00
parent b57cc73f83
commit 21ec4bdaef
14 changed files with 61 additions and 47 deletions

View File

@ -81,6 +81,7 @@ public abstract class ContainerExecutor implements Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerExecutor.class);
protected static final String WILDCARD = "*";
public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
/**
* The permissions to use when creating the launch script.

View File

@ -164,8 +164,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn =
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
String tokenFn = String.format(TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath
@ -180,7 +179,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
+ localizerFc.getWorkingDirectory());
ContainerLocalizer localizer =
createContainerLocalizer(user, appId, locId, localDirs, localizerFc);
createContainerLocalizer(user, appId, locId, tokenFn, localDirs,
localizerFc);
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
@ -204,10 +204,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
@Private
@VisibleForTesting
protected ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, List<String> localDirs,
String appId, String locId, String tokenFileName, List<String> localDirs,
FileContext localizerFc) throws IOException {
ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId,
new ContainerLocalizer(localizerFc, user, appId, locId, tokenFileName,
getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
return localizer;

View File

@ -390,8 +390,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
List<String> localizerArgs = new ArrayList<>();
buildMainArgs(localizerArgs, user, appId, locId, nmAddr, localDirs);
buildMainArgs(localizerArgs, user, appId, locId, nmAddr,
nmPrivateContainerTokensPath.getName(), localDirs);
Path containerLogDir = getContainerLogDir(dirsHandler, appId, locId);
localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir);
@ -449,9 +449,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
*/
@VisibleForTesting
public void buildMainArgs(List<String> command, String user, String appId,
String locId, InetSocketAddress nmAddr, List<String> localDirs) {
String locId, InetSocketAddress nmAddr, String tokenFileName,
List<String> localDirs) {
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
localDirs, super.getConf());
tokenFileName, localDirs, super.getConf());
}
@Override

View File

@ -663,8 +663,8 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(
ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
String tokenFn = String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
@ -702,7 +702,7 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
command.addAll(ContainerLocalizer.getJavaOpts(getConf()));
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
localDirs, super.getConf());
tokenFn, localDirs, super.getConf());
String cmdLine = StringUtils.join(command, " ");

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.TOKEN_FILE_NAME_FMT;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@ -234,8 +235,7 @@ public class ContainerLaunch implements Callable<Integer> {
+ CONTAINER_SCRIPT);
Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr));
+ String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
Path nmPrivateKeystorePath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ KEYSTORE_FILE);

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.security.AMSecretKeys;
import org.slf4j.Logger;
@ -178,7 +177,7 @@ public class ContainerRelaunch extends ContainerLaunch {
String containerIdStr) throws IOException {
return dirsHandler.getLocalPathForRead(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
+ String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
containerIdStr));
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.apache.hadoop.util.Shell.getAllShells;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,7 +95,6 @@ public class ContainerLocalizer {
public static final String FILECACHE = "filecache";
public static final String APPCACHE = "appcache";
public static final String USERCACHE = "usercache";
public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
private static final FsPermission FILECACHE_PERMS =
@ -114,9 +115,10 @@ public class ContainerLocalizer {
private Set<Thread> localizingThreads =
Collections.synchronizedSet(new HashSet<>());
private final String tokenFileName;
public ContainerLocalizer(FileContext lfs, String user, String appId,
String localizerId, List<Path> localDirs,
String localizerId, String tokenFileName, List<Path> localDirs,
RecordFactory recordFactory) throws IOException {
if (null == user) {
throw new IOException("Cannot initialize for null user");
@ -135,6 +137,8 @@ public class ContainerLocalizer {
YarnConfiguration.DEFAULT_DISK_VALIDATOR);
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
this.tokenFileName = Preconditions.checkNotNull(tokenFileName,
"token file name cannot be null");
}
@Private
@ -155,8 +159,7 @@ public class ContainerLocalizer {
try {
// assume credentials in cwd
// TODO: Fix
Path tokenPath =
new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId));
Path tokenPath = new Path(tokenFileName);
credFile = lfs.open(tokenPath);
creds.readTokenStorageStream(credFile);
// Explicitly deleting token file.
@ -404,7 +407,9 @@ public class ContainerLocalizer {
*/
public static void buildMainArgs(List<String> command,
String user, String appId, String locId,
InetSocketAddress nmAddr, List<String> localDirs, Configuration conf) {
InetSocketAddress nmAddr,
String tokenFileName,
List<String> localDirs, Configuration conf) {
String logLevel = conf.get(YarnConfiguration.
NM_CONTAINER_LOCALIZER_LOG_LEVEL,
@ -416,6 +421,7 @@ public class ContainerLocalizer {
command.add(locId);
command.add(nmAddr.getHostName());
command.add(Integer.toString(nmAddr.getPort()));
command.add(tokenFileName);
for(String dir : localDirs) {
command.add(dir);
}
@ -446,8 +452,9 @@ public class ContainerLocalizer {
String locId = argv[2];
InetSocketAddress nmAddr =
new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
String tokenFileName = argv[5];
String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
ArrayList<Path> localDirs = new ArrayList<>(sLocaldirs.length);
for (String sLocaldir : sLocaldirs) {
localDirs.add(new Path(sLocaldir));
}
@ -459,12 +466,11 @@ public class ContainerLocalizer {
LOG.warn("Localization running as " + uid + " not " + user);
}
ContainerLocalizer localizer =
new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
appId, locId, localDirs,
ContainerLocalizer localizer = new ContainerLocalizer(
FileContext.getLocalFSFileContext(), user,
appId, locId, tokenFileName, localDirs,
RecordFactoryProvider.getRecordFactory(null));
localizer.runLocalization(nmAddr);
return;
} catch (Throwable e) {
// Print traces to stdout so that they can be logged by the NM address
// space in both DefaultCE and LCE cases

View File

@ -1028,6 +1028,8 @@ public class ResourceLocalizationService extends CompositeService
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(getConfig());
private final String tokenFileName;
LocalizerRunner(LocalizerContext context, String localizerId) {
super("LocalizerRunner for " + localizerId);
this.context = context;
@ -1035,8 +1037,9 @@ public class ResourceLocalizationService extends CompositeService
this.pending =
Collections
.synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
this.scheduled =
new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
this.scheduled = new HashMap<>();
tokenFileName = String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
localizerId + Long.toHexString(System.currentTimeMillis()));
}
public void addResource(LocalizerResourceRequestEvent request) {
@ -1231,11 +1234,8 @@ public class ResourceLocalizationService extends CompositeService
Throwable exception = null;
try {
// Get nmPrivateDir
nmPrivateCTokensPath =
dirsHandler.getLocalPathForWrite(
NM_PRIVATE_DIR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
localizerId));
nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(
NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);
// 0) init queue, etc.
// 1) write credentials to private dir

View File

@ -569,14 +569,15 @@ public class TestDefaultContainerExecutor {
spy(new DefaultContainerExecutor(mockLfs) {
@Override
public ContainerLocalizer createContainerLocalizer(String user,
String appId, String locId, List<String> localDirs,
FileContext localizerFc) throws IOException {
String appId, String locId, String tokenFileName,
List<String> localDirs, FileContext localizerFc)
throws IOException {
// Spy on the localizer and make it return valid heart-beat
// responses even though there is no real NodeManager.
ContainerLocalizer localizer =
super.createContainerLocalizer(user, appId, locId, localDirs,
localizerFc);
super.createContainerLocalizer(user, appId, locId,
tokenFileName, localDirs, localizerFc);
ContainerLocalizer spyLocalizer = spy(localizer);
LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
try {

View File

@ -361,7 +361,7 @@ public class TestLinuxContainerExecutor {
dirsHandler
.getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId));
+ String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT, locId));
files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
Configuration config = new YarnConfiguration(conf);
InetSocketAddress nmAddr =
@ -374,7 +374,7 @@ public class TestLinuxContainerExecutor {
@Override
public void buildMainArgs(List<String> command, String user,
String appId, String locId, InetSocketAddress nmAddr,
List<String> localDirs) {
String tokenFileName, List<String> localDirs) {
MockContainerLocalizer.buildMainArgs(command, user, appId, locId,
nmAddr, localDirs);
}
@ -395,7 +395,7 @@ public class TestLinuxContainerExecutor {
dirsHandler
.getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId2));
+ String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT, locId2));
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
exec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateContainerTokensPath2)

View File

@ -308,7 +308,7 @@ public class TestLinuxContainerExecutorWithMocks {
.build());
List<String> result=readMockParams();
Assert.assertEquals(result.size(), 25);
Assert.assertEquals(result.size(), 26);
Assert.assertEquals(result.get(0), YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
Assert.assertEquals(result.get(1), "test");
Assert.assertEquals(result.get(2), "0" );
@ -334,6 +334,7 @@ public class TestLinuxContainerExecutorWithMocks {
Assert.assertEquals(result.get(21), "12345");
Assert.assertEquals(result.get(22), "localhost");
Assert.assertEquals(result.get(23), "8040");
Assert.assertEquals(result.get(24), "nmPrivateCTokensPath");
} catch (InterruptedException e) {
LOG.error("Error:"+e.getMessage(),e);

View File

@ -1200,7 +1200,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
// While the container is running, localize new resources.
// Verify the symlink is created properly
@Test
public void testLocalingResourceWhileContainerRunning() throws Exception {
public void testLocalizingResourceWhileContainerRunning() throws Exception {
// Real del service
delSrvc = new DeletionService(exec);
delSrvc.init(conf);

View File

@ -2533,7 +2533,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
Assert.assertEquals(new Path(nmPrivate, ContainerLaunch.CONTAINER_SCRIPT),
csc.getNmPrivateContainerScriptPath());
Assert.assertEquals(new Path(nmPrivate,
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
id.toString())), csc.getNmPrivateTokensPath());
Assert.assertEquals("script",
readStringFromPath(csc.getNmPrivateContainerScriptPath()));

View File

@ -37,6 +37,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -447,7 +449,9 @@ public class TestContainerLocalizer {
FakeContainerLocalizer(FileContext lfs, String user, String appId,
String localizerId, List<Path> localDirs,
RecordFactory recordFactory) throws IOException {
super(lfs, user, appId, localizerId, localDirs, recordFactory);
super(lfs, user, appId, localizerId,
String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT, containerId),
localDirs, recordFactory);
}
FakeLongDownload getDownloader() {
@ -523,7 +527,7 @@ public class TestContainerLocalizer {
DataInputBuffer appTokens = createFakeCredentials(random, 10);
tokenPath =
lfs.makeQualified(new Path(
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
containerId)));
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
).when(spylfs).open(tokenPath);
@ -655,7 +659,8 @@ static DataInputBuffer createFakeCredentials(Random r, int nTok)
RecordFactory recordFactory = mock(RecordFactory.class);
ContainerLocalizer localizer = new ContainerLocalizer(lfs,
UserGroupInformation.getCurrentUser().getUserName(), "application_01",
"container_01", new ArrayList<Path>(), recordFactory);
"container_01", String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
"container_01"), new ArrayList<>(), recordFactory);
LocalResource rsrc = mock(LocalResource.class);
when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
Path destDirPath = new Path(fileCacheDir, "0/0/85");