diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java index c4cbfda1327..5439b534b05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java @@ -101,10 +101,9 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock { return; } - Map checkSumFiles; + Map checkSumFiles; try { - checkSumFiles = fileController.filterFiles(nodeFiles, - LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); + checkSumFiles = fileController.parseCheckSumFiles(nodeFiles); } catch (IOException ex) { LOG.error("Error getting logs for " + logEntity, ex); html.h1("Error getting logs for " + logEntity); @@ -125,12 +124,11 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock { String desiredLogType = $(CONTAINER_LOG_TYPE); try { for (FileStatus thisNodeFile : fileToRead) { - FileStatus checkSum = fileController.getAllChecksumFiles( - checkSumFiles, thisNodeFile.getPath().getName()); + Long checkSumIndex = checkSumFiles.get( + thisNodeFile.getPath().getName()); long endIndex = -1; - if (checkSum != null) { - endIndex = fileController.loadIndexedLogsCheckSum( - checkSum.getPath()); + if (checkSumIndex != null) { + endIndex = checkSumIndex.longValue(); } IndexedLogsMeta indexedLogsMeta = null; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 243945e2c18..800c0a2dd3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -29,6 +29,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.charset.Charset; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; import org.apache.commons.lang.SerializationUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; @@ -77,6 +79,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; @@ -102,7 +106,8 @@ public class LogAggregationIndexedFileController "indexedFile.fs.op.num-retries"; private static final String FS_RETRY_INTERVAL_MS_ATTR = "indexedFile.fs.retry-interval-ms"; - private static final int UUID_LENGTH = 36; + private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB = + "indexedFile.log.roll-over.max-file-size-gb"; @VisibleForTesting public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; @@ -121,7 +126,10 @@ public class LogAggregationIndexedFileController private Path remoteLogCheckSumFile; private FileContext fc; private UserGroupInformation ugi; - private String uuid = null; + private byte[] uuid = null; + private final int UUID_LENGTH = 32; + private long logRollOverMaxFileSize; + private Clock sysClock; public LogAggregationIndexedFileController() {} @@ -164,6 +172,8 @@ public class LogAggregationIndexedFileController compressName); this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3); this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L); + this.logRollOverMaxFileSize = getRollOverLogMaxSize(conf); + this.sysClock = getSystemClock(); } @Override @@ -173,11 +183,12 @@ public class LogAggregationIndexedFileController final UserGroupInformation userUgi = context.getUserUgi(); final Map appAcls = context.getAppAcls(); final String nodeId = context.getNodeId().toString(); + final ApplicationId appId = context.getAppId(); final Path remoteLogFile = context.getRemoteNodeLogFileForApp(); this.ugi = userUgi; logAggregationSuccessfullyInThisCyCle = false; logsMetaInThisCycle = new IndexedPerAggregationLogMeta(); - logAggregationTimeInThisCycle = System.currentTimeMillis(); + logAggregationTimeInThisCycle = this.sysClock.getTime(); logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle); logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName()); try { @@ -187,57 +198,6 @@ public class LogAggregationIndexedFileController fc = FileContext.getFileContext( remoteRootLogDir.toUri(), conf); fc.setUMask(APP_LOG_FILE_UMASK); - boolean fileExist = fc.util().exists(remoteLogFile); - if (fileExist && context.isLogAggregationInRolling()) { - fsDataOStream = fc.create(remoteLogFile, - EnumSet.of(CreateFlag.APPEND), - new Options.CreateOpts[] {}); - if (uuid == null) { - FSDataInputStream fsDataInputStream = null; - try { - fsDataInputStream = fc.open(remoteLogFile); - byte[] b = new byte[UUID_LENGTH]; - int actual = fsDataInputStream.read(b); - if (actual != UUID_LENGTH) { - // Get an error when parse the UUID from existed log file. - // Simply OverWrite the existed log file and re-create the - // UUID. - fsDataOStream = fc.create(remoteLogFile, - EnumSet.of(CreateFlag.OVERWRITE), - new Options.CreateOpts[] {}); - uuid = UUID.randomUUID().toString(); - fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8"))); - fsDataOStream.flush(); - } else { - uuid = new String(b, Charset.forName("UTF-8")); - } - } finally { - IOUtils.cleanupWithLogger(LOG, fsDataInputStream); - } - } - // if the remote log file exists, but we do not have any - // indexedLogsMeta. We need to re-load indexedLogsMeta from - // the existing remote log file. If the re-load fails, we simply - // re-create a new indexedLogsMeta object. And will re-load - // the indexedLogsMeta from checksum file later. - if (indexedLogsMeta == null) { - try { - indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile); - } catch (IOException ex) { - // DO NOTHING - } - } - } else { - fsDataOStream = fc.create(remoteLogFile, - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), - new Options.CreateOpts[] {}); - if (uuid == null) { - uuid = UUID.randomUUID().toString(); - } - byte[] b = uuid.getBytes(Charset.forName("UTF-8")); - fsDataOStream.write(b); - fsDataOStream.flush(); - } if (indexedLogsMeta == null) { indexedLogsMeta = new IndexedLogsMeta(); indexedLogsMeta.setVersion(VERSION); @@ -249,44 +209,24 @@ public class LogAggregationIndexedFileController YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); indexedLogsMeta.setCompressName(compressName); } - final long currentAggregatedLogFileLength = fc - .getFileStatus(remoteLogFile).getLen(); - // only check the check-sum file when we are in append mode + Path aggregatedLogFile = null; if (context.isLogAggregationInRolling()) { - // check whether the checksum file exists to figure out - // whether the previous log aggregation process is successful - // and the aggregated log file is corrupted or not. - remoteLogCheckSumFile = new Path(remoteLogFile.getParent(), - (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX)); - boolean exist = fc.util().exists(remoteLogCheckSumFile); - if (!exist) { - FSDataOutputStream checksumFileOutputStream = null; - try { - checksumFileOutputStream = fc.create(remoteLogCheckSumFile, - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), - new Options.CreateOpts[] {}); - checksumFileOutputStream.writeLong( - currentAggregatedLogFileLength); - } finally { - IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); - } - } else { - FSDataInputStream checksumFileInputStream = null; - try { - checksumFileInputStream = fc.open(remoteLogCheckSumFile); - long endIndex = checksumFileInputStream.readLong(); - IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta( - remoteLogFile, endIndex); - if (recoveredLogsMeta == null) { - indexedLogsMeta.getLogMetas().clear(); - } else { - indexedLogsMeta = recoveredLogsMeta; - } - } finally { - IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); - } + aggregatedLogFile = initializeWriterInRolling( + remoteLogFile, appId, nodeId); + } else { + aggregatedLogFile = remoteLogFile; + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + if (uuid == null) { + uuid = createUUID(appId); } + fsDataOStream.write(uuid); + fsDataOStream.flush(); } + + long aggregatedLogFileLength = fc.getFileStatus( + aggregatedLogFile).getLen(); // append a simple character("\n") to move the writer cursor, so // we could get the correct position when we call // fsOutputStream.getStartPos() @@ -294,11 +234,11 @@ public class LogAggregationIndexedFileController fsDataOStream.write(dummyBytes); fsDataOStream.flush(); - if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength + if (fsDataOStream.getPos() >= (aggregatedLogFileLength + dummyBytes.length)) { currentOffSet = 0; } else { - currentOffSet = currentAggregatedLogFileLength; + currentOffSet = aggregatedLogFileLength; } return null; } @@ -308,6 +248,104 @@ public class LogAggregationIndexedFileController } } + private Path initializeWriterInRolling(final Path remoteLogFile, + final ApplicationId appId, final String nodeId) throws Exception { + Path aggregatedLogFile = null; + // check uuid + // if we can not find uuid, we would load the uuid + // from previous aggregated log files, and at the same + // time, we would delete any aggregated log files which + // has invalid uuid. + if (uuid == null) { + uuid = loadUUIDFromLogFile(fc, remoteLogFile.getParent(), + appId, nodeId); + } + Path currentRemoteLogFile = getCurrentRemoteLogFile( + fc, remoteLogFile.getParent(), nodeId); + // check checksum file + boolean overwriteCheckSum = true; + remoteLogCheckSumFile = new Path(remoteLogFile.getParent(), + (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX)); + if(fc.util().exists(remoteLogCheckSumFile)) { + // if the checksum file exists, we should reset cached + // indexedLogsMeta. + indexedLogsMeta.getLogMetas().clear(); + if (currentRemoteLogFile != null) { + FSDataInputStream checksumFileInputStream = null; + try { + checksumFileInputStream = fc.open(remoteLogCheckSumFile); + int nameLength = checksumFileInputStream.readInt(); + byte[] b = new byte[nameLength]; + int actualLength = checksumFileInputStream.read(b); + if (actualLength == nameLength) { + String recoveredLogFile = new String( + b, Charset.forName("UTF-8")); + if (recoveredLogFile.equals( + currentRemoteLogFile.getName())) { + overwriteCheckSum = false; + long endIndex = checksumFileInputStream.readLong(); + IndexedLogsMeta recoveredLogsMeta = null; + try { + truncateFileWithRetries(fc, currentRemoteLogFile, + endIndex); + recoveredLogsMeta = loadIndexedLogsMeta( + currentRemoteLogFile); + } catch (Exception ex) { + recoveredLogsMeta = loadIndexedLogsMeta( + currentRemoteLogFile, endIndex); + } + if (recoveredLogsMeta != null) { + indexedLogsMeta = recoveredLogsMeta; + } + } + } + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); + } + } + } + // check whether we need roll over old logs + if (currentRemoteLogFile == null || isRollover( + fc, currentRemoteLogFile)) { + indexedLogsMeta.getLogMetas().clear(); + overwriteCheckSum = true; + aggregatedLogFile = new Path(remoteLogFile.getParent(), + remoteLogFile.getName() + "_" + sysClock.getTime()); + fsDataOStream = fc.create(aggregatedLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + // writes the uuid + fsDataOStream.write(uuid); + fsDataOStream.flush(); + } else { + aggregatedLogFile = currentRemoteLogFile; + fsDataOStream = fc.create(currentRemoteLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[] {}); + } + // recreate checksum file if needed before aggregate the logs + if (overwriteCheckSum) { + final long currentAggregatedLogFileLength = fc + .getFileStatus(aggregatedLogFile).getLen(); + FSDataOutputStream checksumFileOutputStream = null; + try { + checksumFileOutputStream = fc.create(remoteLogCheckSumFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + String fileName = aggregatedLogFile.getName(); + checksumFileOutputStream.writeInt(fileName.length()); + checksumFileOutputStream.write(fileName.getBytes( + Charset.forName("UTF-8"))); + checksumFileOutputStream.writeLong( + currentAggregatedLogFileLength); + checksumFileOutputStream.flush(); + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); + } + } + return aggregatedLogFile; + } + @Override public void closeWriter() { IOUtils.cleanupWithLogger(LOG, this.fsDataOStream); @@ -390,8 +428,7 @@ public class LogAggregationIndexedFileController this.fsDataOStream.write(b); int length = b.length; this.fsDataOStream.writeInt(length); - byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8")); - this.fsDataOStream.write(separator); + this.fsDataOStream.write(uuid); if (logAggregationSuccessfullyInThisCyCle && record.isLogAggregationInRolling()) { deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile); @@ -410,6 +447,30 @@ public class LogAggregationIndexedFileController }.runWithRetries(); } + private void deleteFileWithRetries(final FileContext fileContext, + final Path deletePath) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + if (fileContext.util().exists(deletePath)) { + fileContext.delete(deletePath, false); + } + return null; + } + }.runWithRetries(); + } + + private void truncateFileWithRetries(final FileContext fileContext, + final Path truncatePath, final long newLength) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + fileContext.truncate(truncatePath, newLength); + return null; + } + }.runWithRetries(); + } + private Object deleteFileWithPrivilege(final FileContext fileContext, final UserGroupInformation userUgi, final Path fileToDelete) throws Exception { @@ -449,18 +510,16 @@ public class LogAggregationIndexedFileController throw new IOException("There is no available log fils for " + "application:" + appId); } - Map checkSumFiles = filterFiles( - nodeFiles, CHECK_SUM_FILE_SUFFIX); + Map checkSumFiles = parseCheckSumFiles(nodeFiles); List fileToRead = getNodeLogFileToRead( nodeFiles, nodeIdStr, appId); byte[] buf = new byte[65535]; for (FileStatus thisNodeFile : fileToRead) { String nodeName = thisNodeFile.getPath().getName(); - FileStatus checkSum = getAllChecksumFiles(checkSumFiles, - thisNodeFile.getPath().getName()); + Long checkSumIndex = checkSumFiles.get(nodeName); long endIndex = -1; - if (checkSum != null) { - endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + if (checkSumIndex != null) { + endIndex = checkSumIndex.longValue(); } IndexedLogsMeta indexedLogsMeta = null; try { @@ -565,17 +624,16 @@ public class LogAggregationIndexedFileController throw new IOException("There is no available log fils for " + "application:" + appId); } - Map checkSumFiles = filterFiles( - nodeFiles, CHECK_SUM_FILE_SUFFIX); + Map checkSumFiles = parseCheckSumFiles(nodeFiles); List fileToRead = getNodeLogFileToRead( nodeFiles, nodeIdStr, appId); for(FileStatus thisNodeFile : fileToRead) { try { - FileStatus checkSum = getAllChecksumFiles(checkSumFiles, + Long checkSumIndex = checkSumFiles.get( thisNodeFile.getPath().getName()); long endIndex = -1; - if (checkSum != null) { - endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + if (checkSumIndex != null) { + endIndex = checkSumIndex.longValue(); } IndexedLogsMeta current = loadIndexedLogsMeta( thisNodeFile.getPath(), endIndex); @@ -627,21 +685,46 @@ public class LogAggregationIndexedFileController } @Private - public Map filterFiles( - List fileList, final String suffix) throws IOException { - Map checkSumFiles = new HashMap<>(); + public Map parseCheckSumFiles( + List fileList) throws IOException { + Map checkSumFiles = new HashMap<>(); Set status = new HashSet(fileList); Iterable mask = Iterables.filter(status, new Predicate() { @Override public boolean apply(FileStatus next) { return next.getPath().getName().endsWith( - suffix); + CHECK_SUM_FILE_SUFFIX); } }); status = Sets.newHashSet(mask); + FileContext fc = null; for (FileStatus file : status) { - checkSumFiles.put(file.getPath().getName(), file); + FSDataInputStream checksumFileInputStream = null; + try { + if (fc == null) { + fc = FileContext.getFileContext(file.getPath().toUri(), conf); + } + String nodeName = null; + long index = 0L; + checksumFileInputStream = fc.open(file.getPath()); + int nameLength = checksumFileInputStream.readInt(); + byte[] b = new byte[nameLength]; + int actualLength = checksumFileInputStream.read(b); + if (actualLength == nameLength) { + nodeName = new String(b, Charset.forName("UTF-8")); + index = checksumFileInputStream.readLong(); + } else { + continue; + } + if (nodeName != null && !nodeName.isEmpty()) { + checkSumFiles.put(nodeName, Long.valueOf(index)); + } + } catch (IOException ex) { + continue; + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); + } } return checkSumFiles; } @@ -755,20 +838,6 @@ public class LogAggregationIndexedFileController return loadIndexedLogsMeta(remoteLogPath, -1); } - @Private - public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath) - throws IOException { - FileContext fileContext = - FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf); - FSDataInputStream fsDataIStream = null; - try { - fsDataIStream = fileContext.open(remoteLogCheckSumPath); - return fsDataIStream.readLong(); - } finally { - IOUtils.cleanupWithLogger(LOG, fsDataIStream); - } - } - /** * This IndexedLogsMeta includes all the meta information * for the aggregated log file. @@ -1034,6 +1103,13 @@ public class LogAggregationIndexedFileController return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); } + @Private + @VisibleForTesting + public long getRollOverLogMaxSize(Configuration conf) { + return 1024L * 1024 * 1024 * conf.getInt( + LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + } + private abstract class FSAction { abstract T run() throws Exception; @@ -1054,4 +1130,77 @@ public class LogAggregationIndexedFileController } } } + + private Path getCurrentRemoteLogFile(final FileContext fc, + final Path parent, final String nodeId) throws IOException { + RemoteIterator files = fc.listStatus(parent); + long maxTime = 0L; + Path returnPath = null; + while(files.hasNext()) { + FileStatus candidate = files.next(); + String fileName = candidate.getPath().getName(); + if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) + && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX) && + !fileName.endsWith(CHECK_SUM_FILE_SUFFIX)) { + if (candidate.getModificationTime() > maxTime) { + maxTime = candidate.getModificationTime(); + returnPath = candidate.getPath(); + } + } + } + return returnPath; + } + + private byte[] loadUUIDFromLogFile(final FileContext fc, + final Path parent, final ApplicationId appId, final String nodeId) + throws Exception { + byte[] id = null; + RemoteIterator files = fc.listStatus(parent); + FSDataInputStream fsDataInputStream = null; + byte[] uuid = createUUID(appId); + while(files.hasNext()) { + try { + Path checkPath = files.next().getPath(); + if (checkPath.getName().contains(LogAggregationUtils + .getNodeString(nodeId)) && !checkPath.getName() + .endsWith(CHECK_SUM_FILE_SUFFIX)) { + fsDataInputStream = fc.open(checkPath); + byte[] b = new byte[uuid.length]; + int actual = fsDataInputStream.read(b); + if (actual != uuid.length || Arrays.equals(b, uuid)) { + deleteFileWithRetries(fc, checkPath); + } else if (id == null){ + id = uuid; + } + } + } finally { + IOUtils.cleanupWithLogger(LOG, fsDataInputStream); + } + } + return id == null ? uuid : id; + } + + @Private + @VisibleForTesting + public boolean isRollover(final FileContext fc, + final Path candidate) throws IOException { + FileStatus fs = fc.getFileStatus(candidate); + return fs.getLen() >= this.logRollOverMaxFileSize; + } + + @Private + @VisibleForTesting + public Clock getSystemClock() { + return SystemClock.getInstance(); + } + + private byte[] createUUID(ApplicationId appId) throws IOException { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + return digest.digest(appId.toString().getBytes( + Charset.forName("UTF-8"))); + } catch (NoSuchAlgorithmException ex) { + throw new IOException(ex); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java index f77ad96d3ef..7d0205bf915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; import java.io.Writer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -37,6 +37,8 @@ import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -54,6 +56,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -143,8 +147,27 @@ public class TestLogAggregationIndexFileController { LogValue value = mock(LogValue.class); when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); + final ControlledClock clock = new ControlledClock(); + clock.setTime(System.currentTimeMillis()); LogAggregationIndexedFileController fileFormat - = new LogAggregationIndexedFileController(); + = new LogAggregationIndexedFileController() { + private int rollOverCheck = 0; + @Override + public Clock getSystemClock() { + return clock; + } + + @Override + public boolean isRollover(final FileContext fc, + final Path candidate) throws IOException { + rollOverCheck++; + if (rollOverCheck >= 3) { + return true; + } + return false; + } + }; + fileFormat.initialize(conf, "Indexed"); Map appAcls = new HashMap<>(); @@ -203,7 +226,11 @@ public class TestLogAggregationIndexFileController { + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); FSDataOutputStream fInput = null; try { + String nodeName = logPath.getName() + "_" + clock.getTime(); fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK); + fInput.writeInt(nodeName.length()); + fInput.write(nodeName.getBytes( + Charset.forName("UTF-8"))); fInput.writeLong(0); } finally { IOUtils.closeQuietly(fInput); @@ -236,9 +263,9 @@ public class TestLogAggregationIndexFileController { // We did not call postWriter which we would keep the checksum file. // We can only get the logs/logmeta from the first write. - fileFormat.readAggregatedLogsMeta( + meta = fileFormat.readAggregatedLogsMeta( logRequest); - Assert.assertEquals(meta.size(), meta.size(), 1); + Assert.assertEquals(meta.size(), 1); for (ContainerLogMeta log : meta) { Assert.assertTrue(log.getContainerId().equals(containerId.toString())); Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); @@ -267,9 +294,37 @@ public class TestLogAggregationIndexFileController { fileFormat.write(key1, value2); fileFormat.postWrite(context); fileFormat.closeWriter(); - fileFormat.readAggregatedLogsMeta( + meta = fileFormat.readAggregatedLogsMeta( logRequest); - Assert.assertEquals(meta.size(), meta.size(), 2); + Assert.assertEquals(meta.size(), 2); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + + // start to roll over old logs + clock.setTime(System.currentTimeMillis()); + fileFormat.initializeWriter(context); + fileFormat.write(key1, value2); + fileFormat.postWrite(context); + fileFormat.closeWriter(); + FileStatus[] status = fs.listStatus(logPath.getParent()); + Assert.assertTrue(status.length == 2); + meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), 3); for (ContainerLogMeta log : meta) { Assert.assertTrue(log.getContainerId().equals(containerId.toString())); Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));