YARN-9519. TFile log aggregation file format is not working for yarn.log-aggregation.TFile.remote-app-log-dir config. Contributed by Adam Antal.
(cherry picked from commit 7d831eca64
)
This commit is contained in:
parent
b5b152aded
commit
f4ee38df29
|
@ -132,6 +132,10 @@ public abstract class LogAggregationFileController {
|
||||||
this.retentionSize = configuredRetentionSize;
|
this.retentionSize = configuredRetentionSize;
|
||||||
}
|
}
|
||||||
this.fileControllerName = controllerName;
|
this.fileControllerName = controllerName;
|
||||||
|
|
||||||
|
extractRemoteRootLogDir();
|
||||||
|
extractRemoteRootLogDirSuffix();
|
||||||
|
|
||||||
initInternal(conf);
|
initInternal(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,6 +252,45 @@ public abstract class LogAggregationFileController {
|
||||||
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
|
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
|
||||||
Path aggregatedLogPath, ApplicationId appId) throws IOException;
|
Path aggregatedLogPath, ApplicationId appId) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the remoteRootLogDirSuffix class variable extracting
|
||||||
|
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT}
|
||||||
|
* from the configuration, or
|
||||||
|
* {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR_SUFFIX} appended by the
|
||||||
|
* FileController's name, if the former is not set.
|
||||||
|
*/
|
||||||
|
private void extractRemoteRootLogDirSuffix() {
|
||||||
|
String suffix = String.format(
|
||||||
|
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
|
||||||
|
fileControllerName);
|
||||||
|
remoteRootLogDirSuffix = conf.get(suffix);
|
||||||
|
if (remoteRootLogDirSuffix == null
|
||||||
|
|| remoteRootLogDirSuffix.isEmpty()) {
|
||||||
|
remoteRootLogDirSuffix = conf.get(
|
||||||
|
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
||||||
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
|
||||||
|
+ "-" + fileControllerName.toLowerCase();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the remoteRootLogDir class variable extracting
|
||||||
|
* {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT}
|
||||||
|
* from the configuration or {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR},
|
||||||
|
* if the former is not set.
|
||||||
|
*/
|
||||||
|
private void extractRemoteRootLogDir() {
|
||||||
|
String remoteDirStr = String.format(
|
||||||
|
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
|
||||||
|
fileControllerName);
|
||||||
|
String remoteDir = conf.get(remoteDirStr);
|
||||||
|
if (remoteDir == null || remoteDir.isEmpty()) {
|
||||||
|
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
|
||||||
|
}
|
||||||
|
remoteRootLogDir = new Path(remoteDir);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify and create the remote log directory.
|
* Verify and create the remote log directory.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -145,26 +145,6 @@ public class LogAggregationIndexedFileController
|
||||||
+ " use LogAggregationIndexedFileController when the FileSystem "
|
+ " use LogAggregationIndexedFileController when the FileSystem "
|
||||||
+ "support append operations.");
|
+ "support append operations.");
|
||||||
}
|
}
|
||||||
String remoteDirStr = String.format(
|
|
||||||
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
|
|
||||||
this.fileControllerName);
|
|
||||||
String remoteDir = conf.get(remoteDirStr);
|
|
||||||
if (remoteDir == null || remoteDir.isEmpty()) {
|
|
||||||
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
|
|
||||||
}
|
|
||||||
this.remoteRootLogDir = new Path(remoteDir);
|
|
||||||
String suffix = String.format(
|
|
||||||
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
|
|
||||||
this.fileControllerName);
|
|
||||||
this.remoteRootLogDirSuffix = conf.get(suffix);
|
|
||||||
if (this.remoteRootLogDirSuffix == null
|
|
||||||
|| this.remoteRootLogDirSuffix.isEmpty()) {
|
|
||||||
this.remoteRootLogDirSuffix = conf.get(
|
|
||||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
|
|
||||||
+ "-ifile";
|
|
||||||
}
|
|
||||||
String compressName = conf.get(
|
String compressName = conf.get(
|
||||||
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
|
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
|
||||||
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
|
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||||
|
@ -77,12 +76,7 @@ public class LogAggregationTFileController
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initInternal(Configuration conf) {
|
public void initInternal(Configuration conf) {
|
||||||
this.remoteRootLogDir = new Path(
|
// do nothing
|
||||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
|
||||||
this.remoteRootLogDirSuffix =
|
|
||||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,17 +18,26 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.util.LinkedList;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
@ -38,105 +47,200 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test LogAggregationFileControllerFactory.
|
* Test LogAggregationFileControllerFactory.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class TestLogAggregationFileControllerFactory {
|
public class TestLogAggregationFileControllerFactory extends Configured {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
TestLogAggregationFileControllerFactory.class);
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
private static final String REMOTE_LOG_ROOT = "target/app-logs/";
|
||||||
public void testLogAggregationFileControllerFactory() throws Exception {
|
private static final String REMOTE_DEFAULT_DIR = "default/";
|
||||||
ApplicationId appId = ApplicationId.newInstance(
|
private static final String APP_OWNER = "test";
|
||||||
|
|
||||||
|
private static final String WRONG_ROOT_LOG_DIR_MSG =
|
||||||
|
"Wrong remote root log directory found.";
|
||||||
|
private static final String WRONG_ROOT_LOG_DIR_SUFFIX_MSG =
|
||||||
|
"Wrong remote root log directory suffix found.";
|
||||||
|
|
||||||
|
private static final List<Class<? extends LogAggregationFileController>>
|
||||||
|
ALL_FILE_CONTROLLERS = Arrays.asList(
|
||||||
|
TestLogAggregationFileController.class,
|
||||||
|
LogAggregationIndexedFileController.class,
|
||||||
|
LogAggregationTFileController.class);
|
||||||
|
private static final List<String> ALL_FILE_CONTROLLER_NAMES =
|
||||||
|
Arrays.asList("TestLogAggregationFileController", "IFile", "TFile");
|
||||||
|
|
||||||
|
private ApplicationId appId = ApplicationId.newInstance(
|
||||||
System.currentTimeMillis(), 1);
|
System.currentTimeMillis(), 1);
|
||||||
String appOwner = "test";
|
|
||||||
String remoteLogRootDir = "target/app-logs/";
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
|
||||||
|
REMOTE_DEFAULT_DIR);
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
|
||||||
FileSystem fs = FileSystem.get(conf);
|
setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyFileControllerInstance(
|
||||||
|
LogAggregationFileControllerFactory factory,
|
||||||
|
Class<? extends LogAggregationFileController> className)
|
||||||
|
throws IOException {
|
||||||
|
List<LogAggregationFileController> fileControllers =
|
||||||
|
factory.getConfiguredLogAggregationFileControllerList();
|
||||||
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
|
Path logPath = fileControllers.get(0).getRemoteAppLogDir(appId, APP_OWNER);
|
||||||
|
LOG.debug("Checking " + logPath);
|
||||||
|
|
||||||
LogAggregationFileControllerFactory factory =
|
|
||||||
new LogAggregationFileControllerFactory(conf);
|
|
||||||
LinkedList<LogAggregationFileController> list = factory
|
|
||||||
.getConfiguredLogAggregationFileControllerList();
|
|
||||||
assertTrue(list.size() == 1);
|
|
||||||
assertTrue(list.getFirst() instanceof LogAggregationTFileController);
|
|
||||||
assertTrue(factory.getFileControllerForWrite()
|
|
||||||
instanceof LogAggregationTFileController);
|
|
||||||
Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
|
|
||||||
try {
|
try {
|
||||||
if (fs.exists(logPath)) {
|
if (fs.exists(logPath)) {
|
||||||
fs.delete(logPath, true);
|
fs.delete(logPath, true);
|
||||||
}
|
}
|
||||||
assertTrue(fs.mkdirs(logPath));
|
assertTrue(fs.mkdirs(logPath));
|
||||||
Writer writer =
|
try (Writer writer =
|
||||||
new FileWriter(new File(logPath.toString(), "testLog"));
|
new FileWriter(new File(logPath.toString(), "testLog"))) {
|
||||||
writer.write("test");
|
writer.write("test");
|
||||||
writer.close();
|
}
|
||||||
assertTrue(factory.getFileControllerForRead(appId, appOwner)
|
assertTrue("The used LogAggregationFileController is not instance of "
|
||||||
instanceof LogAggregationTFileController);
|
+ className.getSimpleName(), className.isInstance(
|
||||||
|
factory.getFileControllerForRead(appId, APP_OWNER)));
|
||||||
} finally {
|
} finally {
|
||||||
fs.delete(logPath, true);
|
fs.delete(logPath, true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultLogAggregationFileControllerFactory()
|
||||||
|
throws IOException {
|
||||||
|
LogAggregationFileControllerFactory factory =
|
||||||
|
new LogAggregationFileControllerFactory(getConf());
|
||||||
|
List<LogAggregationFileController> list = factory
|
||||||
|
.getConfiguredLogAggregationFileControllerList();
|
||||||
|
|
||||||
|
assertEquals("Only one LogAggregationFileController is expected!", 1,
|
||||||
|
list.size());
|
||||||
|
assertTrue("TFile format is expected to be the first " +
|
||||||
|
"LogAggregationFileController!", list.get(0) instanceof
|
||||||
|
LogAggregationTFileController);
|
||||||
|
assertTrue("TFile format is expected to be used for writing!",
|
||||||
|
factory.getFileControllerForWrite() instanceof
|
||||||
|
LogAggregationTFileController);
|
||||||
|
|
||||||
|
verifyFileControllerInstance(factory, LogAggregationTFileController.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testLogAggregationFileControllerFactoryClassNotSet() {
|
||||||
|
Configuration conf = getConf();
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
||||||
"TestLogAggregationFileController");
|
"TestLogAggregationFileController");
|
||||||
// Did not set class for TestLogAggregationFileController,
|
|
||||||
// should get the exception.
|
|
||||||
try {
|
|
||||||
factory =
|
|
||||||
new LogAggregationFileControllerFactory(conf);
|
new LogAggregationFileControllerFactory(conf);
|
||||||
fail();
|
fail("TestLogAggregationFileController's class was not set, " +
|
||||||
} catch (Exception ex) {
|
"but the factory creation did not fail.");
|
||||||
// should get exception
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void enableFileControllers(
|
||||||
|
List<Class<? extends LogAggregationFileController>> fileControllers,
|
||||||
|
List<String> fileControllerNames) {
|
||||||
|
Configuration conf = getConf();
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
||||||
"TestLogAggregationFileController,TFile");
|
StringUtils.join(fileControllerNames, ","));
|
||||||
conf.setClass(
|
for (int i = 0; i < fileControllers.size(); i++) {
|
||||||
"yarn.log-aggregation.file-controller.TestLogAggregationFileController"
|
Class<? extends LogAggregationFileController> fileController =
|
||||||
+ ".class", TestLogAggregationFileController.class,
|
fileControllers.get(i);
|
||||||
LogAggregationFileController.class);
|
String controllerName = fileControllerNames.get(i);
|
||||||
|
|
||||||
conf.set(
|
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
|
||||||
"yarn.log-aggregation.TestLogAggregationFileController"
|
controllerName), fileController, LogAggregationFileController.class);
|
||||||
+ ".remote-app-log-dir", remoteLogRootDir);
|
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
|
||||||
conf.set(
|
controllerName), REMOTE_LOG_ROOT + controllerName + "/");
|
||||||
"yarn.log-aggregation.TestLogAggregationFileController"
|
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
|
||||||
+ ".remote-app-log-dir-suffix", "testLog");
|
controllerName), controllerName);
|
||||||
|
|
||||||
factory = new LogAggregationFileControllerFactory(conf);
|
|
||||||
list = factory.getConfiguredLogAggregationFileControllerList();
|
|
||||||
assertTrue(list.size() == 2);
|
|
||||||
assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
|
|
||||||
assertTrue(list.getLast() instanceof LogAggregationTFileController);
|
|
||||||
assertTrue(factory.getFileControllerForWrite()
|
|
||||||
instanceof TestLogAggregationFileController);
|
|
||||||
|
|
||||||
logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
|
|
||||||
try {
|
|
||||||
if (fs.exists(logPath)) {
|
|
||||||
fs.delete(logPath, true);
|
|
||||||
}
|
}
|
||||||
assertTrue(fs.mkdirs(logPath));
|
|
||||||
Writer writer =
|
|
||||||
new FileWriter(new File(logPath.toString(), "testLog"));
|
|
||||||
writer.write("test");
|
|
||||||
writer.close();
|
|
||||||
assertTrue(factory.getFileControllerForRead(appId, appOwner)
|
|
||||||
instanceof TestLogAggregationFileController);
|
|
||||||
} finally {
|
|
||||||
fs.delete(logPath, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLogAggregationFileControllerFactory() throws Exception {
|
||||||
|
enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
|
||||||
|
LogAggregationFileControllerFactory factory =
|
||||||
|
new LogAggregationFileControllerFactory(getConf());
|
||||||
|
List<LogAggregationFileController> list =
|
||||||
|
factory.getConfiguredLogAggregationFileControllerList();
|
||||||
|
|
||||||
|
assertEquals("The expected number of LogAggregationFileController " +
|
||||||
|
"is not 3!", 3, list.size());
|
||||||
|
assertTrue("Test format is expected to be the first " +
|
||||||
|
"LogAggregationFileController!", list.get(0) instanceof
|
||||||
|
TestLogAggregationFileController);
|
||||||
|
assertTrue("IFile format is expected to be the second " +
|
||||||
|
"LogAggregationFileController!", list.get(1) instanceof
|
||||||
|
LogAggregationIndexedFileController);
|
||||||
|
assertTrue("TFile format is expected to be the first " +
|
||||||
|
"LogAggregationFileController!", list.get(2) instanceof
|
||||||
|
LogAggregationTFileController);
|
||||||
|
assertTrue("Test format is expected to be used for writing!",
|
||||||
|
factory.getFileControllerForWrite() instanceof
|
||||||
|
TestLogAggregationFileController);
|
||||||
|
|
||||||
|
verifyFileControllerInstance(factory,
|
||||||
|
TestLogAggregationFileController.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClassConfUsed() {
|
||||||
|
enableFileControllers(Collections.singletonList(
|
||||||
|
LogAggregationTFileController.class),
|
||||||
|
Collections.singletonList("TFile"));
|
||||||
|
LogAggregationFileControllerFactory factory =
|
||||||
|
new LogAggregationFileControllerFactory(getConf());
|
||||||
|
LogAggregationFileController fc = factory.getFileControllerForWrite();
|
||||||
|
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/TFile",
|
||||||
|
fc.getRemoteRootLogDir().toString());
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "TFile",
|
||||||
|
fc.getRemoteRootLogDirSuffix());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodemanagerConfigurationIsUsed() {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||||
|
LogAggregationFileControllerFactory factory =
|
||||||
|
new LogAggregationFileControllerFactory(conf);
|
||||||
|
LogAggregationFileController fc = factory.getFileControllerForWrite();
|
||||||
|
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/default",
|
||||||
|
fc.getRemoteRootLogDir().toString());
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "log-tfile",
|
||||||
|
fc.getRemoteRootLogDirSuffix());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultConfUsed() {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
|
||||||
|
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||||
|
|
||||||
|
LogAggregationFileControllerFactory factory =
|
||||||
|
new LogAggregationFileControllerFactory(getConf());
|
||||||
|
LogAggregationFileController fc = factory.getFileControllerForWrite();
|
||||||
|
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_MSG, "/tmp/logs",
|
||||||
|
fc.getRemoteRootLogDir().toString());
|
||||||
|
assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "logs-tfile",
|
||||||
|
fc.getRemoteRootLogDirSuffix());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestLogAggregationFileController
|
private static class TestLogAggregationFileController
|
||||||
|
@ -144,14 +248,7 @@ public class TestLogAggregationFileControllerFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initInternal(Configuration conf) {
|
public void initInternal(Configuration conf) {
|
||||||
String remoteDirStr = String.format(
|
// Do Nothing
|
||||||
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
|
|
||||||
this.fileControllerName);
|
|
||||||
this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
|
|
||||||
String suffix = String.format(
|
|
||||||
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
|
|
||||||
this.fileControllerName);
|
|
||||||
this.remoteRootLogDirSuffix = conf.get(suffix);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue