From f09befd2ea8fceca351c00fc93f47762b7e5b836 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Tue, 14 May 2019 10:48:08 -0700 Subject: [PATCH] YARN-9519. TFile log aggregation file format is not working for yarn.log-aggregation.TFile.remote-app-log-dir config. Contributed by Adam Antal. (cherry picked from commit 7d831eca645f93d064975ebae35a7cbea3bbad31) --- .../LogAggregationFileController.java | 43 +++ .../LogAggregationIndexedFileController.java | 20 -- .../tfile/LogAggregationTFileController.java | 8 +- ...stLogAggregationFileControllerFactory.java | 259 ++++++++++++------ 4 files changed, 222 insertions(+), 108 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 7157db1ed96..ee523495864 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -130,6 +130,10 @@ public abstract class LogAggregationFileController { this.retentionSize = configuredRetentionSize; } this.fileControllerName = controllerName; + + extractRemoteRootLogDir(); + extractRemoteRootLogDirSuffix(); + initInternal(conf); } @@ -244,6 +248,45 @@ public abstract class LogAggregationFileController { public abstract Map getApplicationAcls( Path aggregatedLogPath, ApplicationId appId) throws IOException; + /** + * Sets the remoteRootLogDirSuffix class variable extracting + * {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT} + * from the configuration, or + * {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR_SUFFIX} appended by the + * FileController's name, if the former is not set. + */ + private void extractRemoteRootLogDirSuffix() { + String suffix = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + fileControllerName); + remoteRootLogDirSuffix = conf.get(suffix); + if (remoteRootLogDirSuffix == null + || remoteRootLogDirSuffix.isEmpty()) { + remoteRootLogDirSuffix = conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX) + + "-" + fileControllerName.toLowerCase(); + } + } + + /** + * Sets the remoteRootLogDir class variable extracting + * {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT} + * from the configuration or {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR}, + * if the former is not set. + */ + private void extractRemoteRootLogDir() { + String remoteDirStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + fileControllerName); + String remoteDir = conf.get(remoteDirStr); + if (remoteDir == null || remoteDir.isEmpty()) { + remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR); + } + remoteRootLogDir = new Path(remoteDir); + } + /** * Verify and create the remote log directory. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 14d727f684e..cc25f9188b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -145,26 +145,6 @@ public class LogAggregationIndexedFileController + " use LogAggregationIndexedFileController when the FileSystem " + "support append operations."); } - String remoteDirStr = String.format( - YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, - this.fileControllerName); - String remoteDir = conf.get(remoteDirStr); - if (remoteDir == null || remoteDir.isEmpty()) { - remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR); - } - this.remoteRootLogDir = new Path(remoteDir); - String suffix = String.format( - YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, - this.fileControllerName); - this.remoteRootLogDirSuffix = conf.get(suffix); - if (this.remoteRootLogDirSuffix == null - || this.remoteRootLogDirSuffix.isEmpty()) { - this.remoteRootLogDirSuffix = conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX) - + "-ifile"; - } String compressName = conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index f405e0341a1..173779eae0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; @@ -75,12 +74,7 @@ public class LogAggregationTFileController @Override public void initInternal(Configuration conf) { - this.remoteRootLogDir = new Path( - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - this.remoteRootLogDirSuffix = - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + // do nothing } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java index 99aca1ba996..2d2fb49c0ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java @@ -18,17 +18,26 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller; -import static org.junit.Assert.*; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.Writer; -import java.util.LinkedList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -38,120 +47,208 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; +import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test LogAggregationFileControllerFactory. - * */ -public class TestLogAggregationFileControllerFactory { +public class TestLogAggregationFileControllerFactory extends Configured { + private static final Logger LOG = LoggerFactory.getLogger( + TestLogAggregationFileControllerFactory.class); - @Test(timeout = 10000) - public void testLogAggregationFileControllerFactory() throws Exception { - ApplicationId appId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - String appOwner = "test"; - String remoteLogRootDir = "target/app-logs/"; + private static final String REMOTE_LOG_ROOT = "target/app-logs/"; + private static final String REMOTE_DEFAULT_DIR = "default/"; + private static final String APP_OWNER = "test"; + + private static final String WRONG_ROOT_LOG_DIR_MSG = + "Wrong remote root log directory found."; + private static final String WRONG_ROOT_LOG_DIR_SUFFIX_MSG = + "Wrong remote root log directory suffix found."; + + private static final List> + ALL_FILE_CONTROLLERS = Arrays.asList( + TestLogAggregationFileController.class, + LogAggregationIndexedFileController.class, + LogAggregationTFileController.class); + private static final List ALL_FILE_CONTROLLER_NAMES = + Arrays.asList("TestLogAggregationFileController", "IFile", "TFile"); + + private ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + + @Before + public void setup() throws IOException { Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT + + REMOTE_DEFAULT_DIR); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log"); - FileSystem fs = FileSystem.get(conf); + setConf(conf); + } + + private void verifyFileControllerInstance( + LogAggregationFileControllerFactory factory, + Class className) + throws IOException { + List fileControllers = + factory.getConfiguredLogAggregationFileControllerList(); + FileSystem fs = FileSystem.get(getConf()); + Path logPath = fileControllers.get(0).getRemoteAppLogDir(appId, APP_OWNER); + LOG.debug("Checking " + logPath); - LogAggregationFileControllerFactory factory = - new LogAggregationFileControllerFactory(conf); - LinkedList list = factory - .getConfiguredLogAggregationFileControllerList(); - assertTrue(list.size() == 1); - assertTrue(list.getFirst() instanceof LogAggregationTFileController); - assertTrue(factory.getFileControllerForWrite() - instanceof LogAggregationTFileController); - Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner); try { if (fs.exists(logPath)) { fs.delete(logPath, true); } assertTrue(fs.mkdirs(logPath)); - Writer writer = - new FileWriter(new File(logPath.toString(), "testLog")); - writer.write("test"); - writer.close(); - assertTrue(factory.getFileControllerForRead(appId, appOwner) - instanceof LogAggregationTFileController); + try (Writer writer = + new FileWriter(new File(logPath.toString(), "testLog"))) { + writer.write("test"); + } + assertTrue("The used LogAggregationFileController is not instance of " + + className.getSimpleName(), className.isInstance( + factory.getFileControllerForRead(appId, APP_OWNER))); } finally { fs.delete(logPath, true); } + } + @Test + public void testDefaultLogAggregationFileControllerFactory() + throws IOException { + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(getConf()); + List list = factory + .getConfiguredLogAggregationFileControllerList(); + + assertEquals("Only one LogAggregationFileController is expected!", 1, + list.size()); + assertTrue("TFile format is expected to be the first " + + "LogAggregationFileController!", list.get(0) instanceof + LogAggregationTFileController); + assertTrue("TFile format is expected to be used for writing!", + factory.getFileControllerForWrite() instanceof + LogAggregationTFileController); + + verifyFileControllerInstance(factory, LogAggregationTFileController.class); + } + + @Test(expected = Exception.class) + public void testLogAggregationFileControllerFactoryClassNotSet() { + Configuration conf = getConf(); conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TestLogAggregationFileController"); - // Did not set class for TestLogAggregationFileController, - // should get the exception. - try { - factory = - new LogAggregationFileControllerFactory(conf); - fail(); - } catch (Exception ex) { - // should get exception - } + new LogAggregationFileControllerFactory(conf); + fail("TestLogAggregationFileController's class was not set, " + + "but the factory creation did not fail."); + } + private void enableFileControllers( + List> fileControllers, + List fileControllerNames) { + Configuration conf = getConf(); conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, - "TestLogAggregationFileController,TFile"); - conf.setClass( - "yarn.log-aggregation.file-controller.TestLogAggregationFileController" - + ".class", TestLogAggregationFileController.class, - LogAggregationFileController.class); + StringUtils.join(fileControllerNames, ",")); + for (int i = 0; i < fileControllers.size(); i++) { + Class fileController = + fileControllers.get(i); + String controllerName = fileControllerNames.get(i); - conf.set( - "yarn.log-aggregation.TestLogAggregationFileController" - + ".remote-app-log-dir", remoteLogRootDir); - conf.set( - "yarn.log-aggregation.TestLogAggregationFileController" - + ".remote-app-log-dir-suffix", "testLog"); - - factory = new LogAggregationFileControllerFactory(conf); - list = factory.getConfiguredLogAggregationFileControllerList(); - assertTrue(list.size() == 2); - assertTrue(list.getFirst() instanceof TestLogAggregationFileController); - assertTrue(list.getLast() instanceof LogAggregationTFileController); - assertTrue(factory.getFileControllerForWrite() - instanceof TestLogAggregationFileController); - - logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner); - try { - if (fs.exists(logPath)) { - fs.delete(logPath, true); - } - assertTrue(fs.mkdirs(logPath)); - Writer writer = - new FileWriter(new File(logPath.toString(), "testLog")); - writer.write("test"); - writer.close(); - assertTrue(factory.getFileControllerForRead(appId, appOwner) - instanceof TestLogAggregationFileController); - } finally { - fs.delete(logPath, true); + conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, + controllerName), fileController, LogAggregationFileController.class); + conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + controllerName), REMOTE_LOG_ROOT + controllerName + "/"); + conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + controllerName), controllerName); } } + @Test + public void testLogAggregationFileControllerFactory() throws Exception { + enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES); + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(getConf()); + List list = + factory.getConfiguredLogAggregationFileControllerList(); + + assertEquals("The expected number of LogAggregationFileController " + + "is not 3!", 3, list.size()); + assertTrue("Test format is expected to be the first " + + "LogAggregationFileController!", list.get(0) instanceof + TestLogAggregationFileController); + assertTrue("IFile format is expected to be the second " + + "LogAggregationFileController!", list.get(1) instanceof + LogAggregationIndexedFileController); + assertTrue("TFile format is expected to be the first " + + "LogAggregationFileController!", list.get(2) instanceof + LogAggregationTFileController); + assertTrue("Test format is expected to be used for writing!", + factory.getFileControllerForWrite() instanceof + TestLogAggregationFileController); + + verifyFileControllerInstance(factory, + TestLogAggregationFileController.class); + } + + @Test + public void testClassConfUsed() { + enableFileControllers(Collections.singletonList( + LogAggregationTFileController.class), + Collections.singletonList("TFile")); + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(getConf()); + LogAggregationFileController fc = factory.getFileControllerForWrite(); + + assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/TFile", + fc.getRemoteRootLogDir().toString()); + assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "TFile", + fc.getRemoteRootLogDirSuffix()); + } + + @Test + public void testNodemanagerConfigurationIsUsed() { + Configuration conf = getConf(); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + LogAggregationFileController fc = factory.getFileControllerForWrite(); + + assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/default", + fc.getRemoteRootLogDir().toString()); + assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "log-tfile", + fc.getRemoteRootLogDirSuffix()); + } + + @Test + public void testDefaultConfUsed() { + Configuration conf = getConf(); + conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR); + conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(getConf()); + LogAggregationFileController fc = factory.getFileControllerForWrite(); + + assertEquals(WRONG_ROOT_LOG_DIR_MSG, "/tmp/logs", + fc.getRemoteRootLogDir().toString()); + assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "logs-tfile", + fc.getRemoteRootLogDirSuffix()); + } + private static class TestLogAggregationFileController extends LogAggregationFileController { @Override public void initInternal(Configuration conf) { - String remoteDirStr = String.format( - YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, - this.fileControllerName); - this.remoteRootLogDir = new Path(conf.get(remoteDirStr)); - String suffix = String.format( - YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, - this.fileControllerName); - this.remoteRootLogDirSuffix = conf.get(suffix); + // Do Nothing } @Override