YARN-7259. Add size-based rolling policy to LogAggregationIndexedFileController. (xgong via wangda)

Change-Id: Ifaf82c0aee6b73b9b6ebf103aa72e131e3942f31
(cherry picked from commit 280080fad0)
This commit is contained in:
Wangda Tan 2017-10-02 15:30:22 -07:00 committed by Xuan
parent aef3e4b6af
commit c2f751cb0f
3 changed files with 340 additions and 138 deletions

View File

@ -101,10 +101,9 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
return;
}
Map<String, FileStatus> checkSumFiles;
Map<String, Long> checkSumFiles;
try {
checkSumFiles = fileController.filterFiles(nodeFiles,
LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
checkSumFiles = fileController.parseCheckSumFiles(nodeFiles);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
html.h1("Error getting logs for " + logEntity);
@ -125,12 +124,11 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
String desiredLogType = $(CONTAINER_LOG_TYPE);
try {
for (FileStatus thisNodeFile : fileToRead) {
FileStatus checkSum = fileController.getAllChecksumFiles(
checkSumFiles, thisNodeFile.getPath().getName());
Long checkSumIndex = checkSumFiles.get(
thisNodeFile.getPath().getName());
long endIndex = -1;
if (checkSum != null) {
endIndex = fileController.loadIndexedLogsCheckSum(
checkSum.getPath());
if (checkSumIndex != null) {
endIndex = checkSumIndex.longValue();
}
IndexedLogsMeta indexedLogsMeta = null;
try {

View File

@ -29,6 +29,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,7 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
@ -77,6 +79,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
@ -102,7 +106,8 @@ public class LogAggregationIndexedFileController
"indexedFile.fs.op.num-retries";
private static final String FS_RETRY_INTERVAL_MS_ATTR =
"indexedFile.fs.retry-interval-ms";
private static final int UUID_LENGTH = 36;
private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
"indexedFile.log.roll-over.max-file-size-gb";
@VisibleForTesting
public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
@ -121,7 +126,10 @@ public class LogAggregationIndexedFileController
private Path remoteLogCheckSumFile;
private FileContext fc;
private UserGroupInformation ugi;
private String uuid = null;
private byte[] uuid = null;
private final int UUID_LENGTH = 32;
private long logRollOverMaxFileSize;
private Clock sysClock;
public LogAggregationIndexedFileController() {}
@ -164,6 +172,8 @@ public class LogAggregationIndexedFileController
compressName);
this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3);
this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L);
this.logRollOverMaxFileSize = getRollOverLogMaxSize(conf);
this.sysClock = getSystemClock();
}
@Override
@ -173,11 +183,12 @@ public class LogAggregationIndexedFileController
final UserGroupInformation userUgi = context.getUserUgi();
final Map<ApplicationAccessType, String> appAcls = context.getAppAcls();
final String nodeId = context.getNodeId().toString();
final ApplicationId appId = context.getAppId();
final Path remoteLogFile = context.getRemoteNodeLogFileForApp();
this.ugi = userUgi;
logAggregationSuccessfullyInThisCyCle = false;
logsMetaInThisCycle = new IndexedPerAggregationLogMeta();
logAggregationTimeInThisCycle = System.currentTimeMillis();
logAggregationTimeInThisCycle = this.sysClock.getTime();
logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle);
logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName());
try {
@ -187,57 +198,6 @@ public class LogAggregationIndexedFileController
fc = FileContext.getFileContext(
remoteRootLogDir.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
boolean fileExist = fc.util().exists(remoteLogFile);
if (fileExist && context.isLogAggregationInRolling()) {
fsDataOStream = fc.create(remoteLogFile,
EnumSet.of(CreateFlag.APPEND),
new Options.CreateOpts[] {});
if (uuid == null) {
FSDataInputStream fsDataInputStream = null;
try {
fsDataInputStream = fc.open(remoteLogFile);
byte[] b = new byte[UUID_LENGTH];
int actual = fsDataInputStream.read(b);
if (actual != UUID_LENGTH) {
// Get an error when parse the UUID from existed log file.
// Simply OverWrite the existed log file and re-create the
// UUID.
fsDataOStream = fc.create(remoteLogFile,
EnumSet.of(CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
uuid = UUID.randomUUID().toString();
fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8")));
fsDataOStream.flush();
} else {
uuid = new String(b, Charset.forName("UTF-8"));
}
} finally {
IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
}
}
// if the remote log file exists, but we do not have any
// indexedLogsMeta. We need to re-load indexedLogsMeta from
// the existing remote log file. If the re-load fails, we simply
// re-create a new indexedLogsMeta object. And will re-load
// the indexedLogsMeta from checksum file later.
if (indexedLogsMeta == null) {
try {
indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile);
} catch (IOException ex) {
// DO NOTHING
}
}
} else {
fsDataOStream = fc.create(remoteLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
if (uuid == null) {
uuid = UUID.randomUUID().toString();
}
byte[] b = uuid.getBytes(Charset.forName("UTF-8"));
fsDataOStream.write(b);
fsDataOStream.flush();
}
if (indexedLogsMeta == null) {
indexedLogsMeta = new IndexedLogsMeta();
indexedLogsMeta.setVersion(VERSION);
@ -249,44 +209,24 @@ public class LogAggregationIndexedFileController
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
indexedLogsMeta.setCompressName(compressName);
}
final long currentAggregatedLogFileLength = fc
.getFileStatus(remoteLogFile).getLen();
// only check the check-sum file when we are in append mode
Path aggregatedLogFile = null;
if (context.isLogAggregationInRolling()) {
// check whether the checksum file exists to figure out
// whether the previous log aggregation process is successful
// and the aggregated log file is corrupted or not.
remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
(remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
boolean exist = fc.util().exists(remoteLogCheckSumFile);
if (!exist) {
FSDataOutputStream checksumFileOutputStream = null;
try {
checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
aggregatedLogFile = initializeWriterInRolling(
remoteLogFile, appId, nodeId);
} else {
aggregatedLogFile = remoteLogFile;
fsDataOStream = fc.create(remoteLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
checksumFileOutputStream.writeLong(
currentAggregatedLogFileLength);
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
}
} else {
FSDataInputStream checksumFileInputStream = null;
try {
checksumFileInputStream = fc.open(remoteLogCheckSumFile);
long endIndex = checksumFileInputStream.readLong();
IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta(
remoteLogFile, endIndex);
if (recoveredLogsMeta == null) {
indexedLogsMeta.getLogMetas().clear();
} else {
indexedLogsMeta = recoveredLogsMeta;
}
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
}
if (uuid == null) {
uuid = createUUID(appId);
}
fsDataOStream.write(uuid);
fsDataOStream.flush();
}
long aggregatedLogFileLength = fc.getFileStatus(
aggregatedLogFile).getLen();
// append a simple character("\n") to move the writer cursor, so
// we could get the correct position when we call
// fsOutputStream.getStartPos()
@ -294,11 +234,11 @@ public class LogAggregationIndexedFileController
fsDataOStream.write(dummyBytes);
fsDataOStream.flush();
if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength
if (fsDataOStream.getPos() >= (aggregatedLogFileLength
+ dummyBytes.length)) {
currentOffSet = 0;
} else {
currentOffSet = currentAggregatedLogFileLength;
currentOffSet = aggregatedLogFileLength;
}
return null;
}
@ -308,6 +248,104 @@ public class LogAggregationIndexedFileController
}
}
private Path initializeWriterInRolling(final Path remoteLogFile,
final ApplicationId appId, final String nodeId) throws Exception {
Path aggregatedLogFile = null;
// check uuid
// if we can not find uuid, we would load the uuid
// from previous aggregated log files, and at the same
// time, we would delete any aggregated log files which
// has invalid uuid.
if (uuid == null) {
uuid = loadUUIDFromLogFile(fc, remoteLogFile.getParent(),
appId, nodeId);
}
Path currentRemoteLogFile = getCurrentRemoteLogFile(
fc, remoteLogFile.getParent(), nodeId);
// check checksum file
boolean overwriteCheckSum = true;
remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
(remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
if(fc.util().exists(remoteLogCheckSumFile)) {
// if the checksum file exists, we should reset cached
// indexedLogsMeta.
indexedLogsMeta.getLogMetas().clear();
if (currentRemoteLogFile != null) {
FSDataInputStream checksumFileInputStream = null;
try {
checksumFileInputStream = fc.open(remoteLogCheckSumFile);
int nameLength = checksumFileInputStream.readInt();
byte[] b = new byte[nameLength];
int actualLength = checksumFileInputStream.read(b);
if (actualLength == nameLength) {
String recoveredLogFile = new String(
b, Charset.forName("UTF-8"));
if (recoveredLogFile.equals(
currentRemoteLogFile.getName())) {
overwriteCheckSum = false;
long endIndex = checksumFileInputStream.readLong();
IndexedLogsMeta recoveredLogsMeta = null;
try {
truncateFileWithRetries(fc, currentRemoteLogFile,
endIndex);
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile);
} catch (Exception ex) {
recoveredLogsMeta = loadIndexedLogsMeta(
currentRemoteLogFile, endIndex);
}
if (recoveredLogsMeta != null) {
indexedLogsMeta = recoveredLogsMeta;
}
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
}
}
}
// check whether we need roll over old logs
if (currentRemoteLogFile == null || isRollover(
fc, currentRemoteLogFile)) {
indexedLogsMeta.getLogMetas().clear();
overwriteCheckSum = true;
aggregatedLogFile = new Path(remoteLogFile.getParent(),
remoteLogFile.getName() + "_" + sysClock.getTime());
fsDataOStream = fc.create(aggregatedLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
// writes the uuid
fsDataOStream.write(uuid);
fsDataOStream.flush();
} else {
aggregatedLogFile = currentRemoteLogFile;
fsDataOStream = fc.create(currentRemoteLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
new Options.CreateOpts[] {});
}
// recreate checksum file if needed before aggregate the logs
if (overwriteCheckSum) {
final long currentAggregatedLogFileLength = fc
.getFileStatus(aggregatedLogFile).getLen();
FSDataOutputStream checksumFileOutputStream = null;
try {
checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
String fileName = aggregatedLogFile.getName();
checksumFileOutputStream.writeInt(fileName.length());
checksumFileOutputStream.write(fileName.getBytes(
Charset.forName("UTF-8")));
checksumFileOutputStream.writeLong(
currentAggregatedLogFileLength);
checksumFileOutputStream.flush();
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
}
}
return aggregatedLogFile;
}
@Override
public void closeWriter() {
IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
@ -390,8 +428,7 @@ public class LogAggregationIndexedFileController
this.fsDataOStream.write(b);
int length = b.length;
this.fsDataOStream.writeInt(length);
byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8"));
this.fsDataOStream.write(separator);
this.fsDataOStream.write(uuid);
if (logAggregationSuccessfullyInThisCyCle &&
record.isLogAggregationInRolling()) {
deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile);
@ -410,6 +447,30 @@ public class LogAggregationIndexedFileController
}.runWithRetries();
}
private void deleteFileWithRetries(final FileContext fileContext,
final Path deletePath) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
if (fileContext.util().exists(deletePath)) {
fileContext.delete(deletePath, false);
}
return null;
}
}.runWithRetries();
}
private void truncateFileWithRetries(final FileContext fileContext,
final Path truncatePath, final long newLength) throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
fileContext.truncate(truncatePath, newLength);
return null;
}
}.runWithRetries();
}
private Object deleteFileWithPrivilege(final FileContext fileContext,
final UserGroupInformation userUgi, final Path fileToDelete)
throws Exception {
@ -449,18 +510,16 @@ public class LogAggregationIndexedFileController
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, FileStatus> checkSumFiles = filterFiles(
nodeFiles, CHECK_SUM_FILE_SUFFIX);
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
byte[] buf = new byte[65535];
for (FileStatus thisNodeFile : fileToRead) {
String nodeName = thisNodeFile.getPath().getName();
FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
thisNodeFile.getPath().getName());
Long checkSumIndex = checkSumFiles.get(nodeName);
long endIndex = -1;
if (checkSum != null) {
endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
if (checkSumIndex != null) {
endIndex = checkSumIndex.longValue();
}
IndexedLogsMeta indexedLogsMeta = null;
try {
@ -565,17 +624,16 @@ public class LogAggregationIndexedFileController
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, FileStatus> checkSumFiles = filterFiles(
nodeFiles, CHECK_SUM_FILE_SUFFIX);
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
for(FileStatus thisNodeFile : fileToRead) {
try {
FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
Long checkSumIndex = checkSumFiles.get(
thisNodeFile.getPath().getName());
long endIndex = -1;
if (checkSum != null) {
endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
if (checkSumIndex != null) {
endIndex = checkSumIndex.longValue();
}
IndexedLogsMeta current = loadIndexedLogsMeta(
thisNodeFile.getPath(), endIndex);
@ -627,21 +685,46 @@ public class LogAggregationIndexedFileController
}
@Private
public Map<String, FileStatus> filterFiles(
List<FileStatus> fileList, final String suffix) throws IOException {
Map<String, FileStatus> checkSumFiles = new HashMap<>();
public Map<String, Long> parseCheckSumFiles(
List<FileStatus> fileList) throws IOException {
Map<String, Long> checkSumFiles = new HashMap<>();
Set<FileStatus> status = new HashSet<FileStatus>(fileList);
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName().endsWith(
suffix);
CHECK_SUM_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
FileContext fc = null;
for (FileStatus file : status) {
checkSumFiles.put(file.getPath().getName(), file);
FSDataInputStream checksumFileInputStream = null;
try {
if (fc == null) {
fc = FileContext.getFileContext(file.getPath().toUri(), conf);
}
String nodeName = null;
long index = 0L;
checksumFileInputStream = fc.open(file.getPath());
int nameLength = checksumFileInputStream.readInt();
byte[] b = new byte[nameLength];
int actualLength = checksumFileInputStream.read(b);
if (actualLength == nameLength) {
nodeName = new String(b, Charset.forName("UTF-8"));
index = checksumFileInputStream.readLong();
} else {
continue;
}
if (nodeName != null && !nodeName.isEmpty()) {
checkSumFiles.put(nodeName, Long.valueOf(index));
}
} catch (IOException ex) {
continue;
} finally {
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
}
}
return checkSumFiles;
}
@ -755,20 +838,6 @@ public class LogAggregationIndexedFileController
return loadIndexedLogsMeta(remoteLogPath, -1);
}
@Private
public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf);
FSDataInputStream fsDataIStream = null;
try {
fsDataIStream = fileContext.open(remoteLogCheckSumPath);
return fsDataIStream.readLong();
} finally {
IOUtils.cleanupWithLogger(LOG, fsDataIStream);
}
}
/**
* This IndexedLogsMeta includes all the meta information
* for the aggregated log file.
@ -1034,6 +1103,13 @@ public class LogAggregationIndexedFileController
return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
}
@Private
@VisibleForTesting
public long getRollOverLogMaxSize(Configuration conf) {
return 1024L * 1024 * 1024 * conf.getInt(
LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
}
private abstract class FSAction<T> {
abstract T run() throws Exception;
@ -1054,4 +1130,77 @@ public class LogAggregationIndexedFileController
}
}
}
private Path getCurrentRemoteLogFile(final FileContext fc,
final Path parent, final String nodeId) throws IOException {
RemoteIterator<FileStatus> files = fc.listStatus(parent);
long maxTime = 0L;
Path returnPath = null;
while(files.hasNext()) {
FileStatus candidate = files.next();
String fileName = candidate.getPath().getName();
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX) &&
!fileName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
if (candidate.getModificationTime() > maxTime) {
maxTime = candidate.getModificationTime();
returnPath = candidate.getPath();
}
}
}
return returnPath;
}
private byte[] loadUUIDFromLogFile(final FileContext fc,
final Path parent, final ApplicationId appId, final String nodeId)
throws Exception {
byte[] id = null;
RemoteIterator<FileStatus> files = fc.listStatus(parent);
FSDataInputStream fsDataInputStream = null;
byte[] uuid = createUUID(appId);
while(files.hasNext()) {
try {
Path checkPath = files.next().getPath();
if (checkPath.getName().contains(LogAggregationUtils
.getNodeString(nodeId)) && !checkPath.getName()
.endsWith(CHECK_SUM_FILE_SUFFIX)) {
fsDataInputStream = fc.open(checkPath);
byte[] b = new byte[uuid.length];
int actual = fsDataInputStream.read(b);
if (actual != uuid.length || Arrays.equals(b, uuid)) {
deleteFileWithRetries(fc, checkPath);
} else if (id == null){
id = uuid;
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
}
}
return id == null ? uuid : id;
}
@Private
@VisibleForTesting
public boolean isRollover(final FileContext fc,
final Path candidate) throws IOException {
FileStatus fs = fc.getFileStatus(candidate);
return fs.getLen() >= this.logRollOverMaxFileSize;
}
@Private
@VisibleForTesting
public Clock getSystemClock() {
return SystemClock.getInstance();
}
private byte[] createUUID(ApplicationId appId) throws IOException {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(appId.toString().getBytes(
Charset.forName("UTF-8")));
} catch (NoSuchAlgorithmException ex) {
throw new IOException(ex);
}
}
}

View File

@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -37,6 +37,8 @@ import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -54,6 +56,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -143,8 +147,27 @@ public class TestLogAggregationIndexFileController {
LogValue value = mock(LogValue.class);
when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
final ControlledClock clock = new ControlledClock();
clock.setTime(System.currentTimeMillis());
LogAggregationIndexedFileController fileFormat
= new LogAggregationIndexedFileController();
= new LogAggregationIndexedFileController() {
private int rollOverCheck = 0;
@Override
public Clock getSystemClock() {
return clock;
}
@Override
public boolean isRollover(final FileContext fc,
final Path candidate) throws IOException {
rollOverCheck++;
if (rollOverCheck >= 3) {
return true;
}
return false;
}
};
fileFormat.initialize(conf, "Indexed");
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
@ -203,7 +226,11 @@ public class TestLogAggregationIndexFileController {
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
FSDataOutputStream fInput = null;
try {
String nodeName = logPath.getName() + "_" + clock.getTime();
fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
fInput.writeInt(nodeName.length());
fInput.write(nodeName.getBytes(
Charset.forName("UTF-8")));
fInput.writeLong(0);
} finally {
IOUtils.closeQuietly(fInput);
@ -236,9 +263,9 @@ public class TestLogAggregationIndexFileController {
// We did not call postWriter which we would keep the checksum file.
// We can only get the logs/logmeta from the first write.
fileFormat.readAggregatedLogsMeta(
meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), meta.size(), 1);
Assert.assertEquals(meta.size(), 1);
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
@ -267,9 +294,37 @@ public class TestLogAggregationIndexFileController {
fileFormat.write(key1, value2);
fileFormat.postWrite(context);
fileFormat.closeWriter();
fileFormat.readAggregatedLogsMeta(
meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), meta.size(), 2);
Assert.assertEquals(meta.size(), 2);
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(newLogTypes);
Assert.assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : newLogTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
// start to roll over old logs
clock.setTime(System.currentTimeMillis());
fileFormat.initializeWriter(context);
fileFormat.write(key1, value2);
fileFormat.postWrite(context);
fileFormat.closeWriter();
FileStatus[] status = fs.listStatus(logPath.getParent());
Assert.assertTrue(status.length == 2);
meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), 3);
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));