YARN-9030. Log aggregation changes to handle filesystems which do not support setting permissions. (Suma Shivaprasad via wangda)

Change-Id: I80f1e8196b8624e24d74494719fdedfd7061dced
This commit is contained in:
Wangda Tan 2018-11-21 17:28:37 -08:00
parent 4d8de7ab69
commit 9de8e8d049
1 changed files with 56 additions and 18 deletions

View File

@ -109,6 +109,8 @@ public abstract class LogAggregationFileController {
protected int retentionSize;
protected String fileControllerName;
protected boolean fsSupportsChmod = true;
public LogAggregationFileController() {}
/**
@ -250,7 +252,6 @@ public abstract class LogAggregationFileController {
* Verify and create the remote log directory.
*/
public void verifyAndCreateRemoteLogDir() {
boolean logPermError = true;
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
@ -264,14 +265,12 @@ public abstract class LogAggregationFileController {
try {
FsPermission perms =
remoteFS.getFileStatus(remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
if (!perms.equals(TLDIR_PERMISSIONS)) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
logPermError = false;
} else {
logPermError = true;
}
} catch (FileNotFoundException e) {
remoteExists = false;
@ -280,15 +279,26 @@ public abstract class LogAggregationFileController {
"Failed to check permissions for dir ["
+ remoteRootLogDir + "]", e);
}
Path qualified =
remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!remoteExists) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
// Not possible to query FileSystem API to check if it supports
// chmod, chown etc. Hence resorting to catching exceptions here.
// Remove when FS APi is ready
try {
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch ( UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this", remoteFS.getScheme());
fsSupportsChmod = false;
}
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
String primaryGroupName = null;
@ -301,13 +311,31 @@ public abstract class LogAggregationFileController {
}
// set owner on the remote directory only if the primary group exists
if (primaryGroupName != null) {
remoteFS.setOwner(qualified,
loginUser.getShortUserName(), primaryGroupName);
try {
remoteFS.setOwner(qualified, loginUser.getShortUserName(),
primaryGroupName);
} catch (UnsupportedOperationException use) {
LOG.info(
"File System does not support setting user/group" + remoteFS
.getScheme(), use);
}
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ remoteRootLogDir + "]", e);
}
} else{
//Check if FS has capability to set/modify permissions
try {
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch (UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this", remoteFS.getScheme());
fsSupportsChmod = false;
} catch (IOException e) {
LOG.warn("Failed to check if FileSystem suppports permissions on "
+ "remoteLogDir [" + remoteRootLogDir + "]", e);
}
}
}
@ -383,11 +411,16 @@ public abstract class LogAggregationFileController {
protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
if (fsSupportsChmod) {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
}
} else {
fs.mkdirs(path);
}
}
@ -396,8 +429,10 @@ public abstract class LogAggregationFileController {
boolean exists = true;
try {
FileStatus appDirStatus = fs.getFileStatus(path);
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
if (fsSupportsChmod) {
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
}
}
} catch (FileNotFoundException fnfe) {
exists = false;
@ -502,4 +537,7 @@ public abstract class LogAggregationFileController {
return sb.toString();
}
public boolean isFsSupportsChmod() {
return fsSupportsChmod;
}
}