YARN-9607. Auto-configuring rollover-size of IFile format for non-appendable filesystems. Contributed by Adam Antal

This commit is contained in:
Szilard Nemeth 2019-12-05 21:49:00 +01:00
parent cf68857631
commit 4f758dd4c6
3 changed files with 48 additions and 18 deletions

View File

@ -99,11 +99,6 @@ public abstract class LogAggregationFileController {
protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0640 ^ 0777));
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
= YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;

View File

@ -47,12 +47,15 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@ -69,7 +72,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
@ -135,16 +137,6 @@ public class LogAggregationIndexedFileController
@Override
public void initInternal(Configuration conf) {
// Currently, we need the underlying File System to support append
// operation. Will remove this check after we finish
// LogAggregationIndexedFileController for non-append mode.
boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true);
if (!append) {
throw new YarnRuntimeException("The configuration:"
+ LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only"
+ " use LogAggregationIndexedFileController when the FileSystem "
+ "support append operations.");
}
String compressName = conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
@ -1139,8 +1131,23 @@ public class LogAggregationIndexedFileController
@Private
@VisibleForTesting
public long getRollOverLogMaxSize(Configuration conf) {
return 1024L * 1024 * 1024 * conf.getInt(
LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
boolean supportAppend = false;
try {
FileSystem fs = FileSystem.get(remoteRootLogDir.toUri(), conf);
if (fs instanceof LocalFileSystem || fs.hasPathCapability(
remoteRootLogDir, CommonPathCapabilities.FS_APPEND)) {
supportAppend = true;
}
} catch (Exception ioe) {
LOG.warn("Unable to determine if the filesystem supports " +
"append operation", ioe);
}
if (supportAppend) {
return 1024L * 1024 * 1024 * conf.getInt(
LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
} else {
return 0L;
}
}
private abstract class FSAction<T> {

View File

@ -469,4 +469,32 @@ public class TestLogAggregationIndexedFileController
private String logMessage(ContainerId containerId, String logType) {
return "Hello " + containerId + " in " + logType + "!";
}
@Test
public void testGetRollOverLogMaxSize() {
String fileControllerName = "testController";
String remoteDirConf = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
fileControllerName);
Configuration conf = new Configuration();
LogAggregationIndexedFileController fileFormat
= new LogAggregationIndexedFileController();
long defaultRolloverSize = 10L * 1024 * 1024 * 1024;
// test local filesystem
fileFormat.initialize(conf, fileControllerName);
assertThat(fileFormat.getRollOverLogMaxSize(conf))
.isEqualTo(defaultRolloverSize);
// test file system supporting append
conf.set(remoteDirConf, "webhdfs://localhost/path");
fileFormat.initialize(conf, fileControllerName);
assertThat(fileFormat.getRollOverLogMaxSize(conf))
.isEqualTo(defaultRolloverSize);
// test file system not supporting append
conf.set(remoteDirConf, "s3a://test/path");
fileFormat.initialize(conf, fileControllerName);
assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero();
}
}