YARN-7072. Add a new log aggregation file format controller (xgong)

This commit is contained in:
Xuan 2017-09-10 23:18:31 -07:00
parent 23dc6ef8b0
commit 02ae3f8202
19 changed files with 1799 additions and 86 deletions

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
* BoundedRangeFileInputStream on top of the same FSDataInputStream and they
* would not interfere with each other.
*/
class BoundedRangeFileInputStream extends InputStream {
public class BoundedRangeFileInputStream extends InputStream {
private FSDataInputStream in;
private long pos;

View File

@ -43,7 +43,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
/**
* Compression related stuff.
*/
final class Compression {
public final class Compression {
static final Logger LOG = LoggerFactory.getLogger(Compression.class);
/**
@ -75,7 +75,7 @@ final class Compression {
/**
* Compression algorithms.
*/
static enum Algorithm {
public static enum Algorithm {
LZO(TFile.COMPRESSION_LZO) {
private transient boolean checked = false;
private static final String defaultClazz =
@ -348,7 +348,7 @@ final class Compression {
}
}
static Algorithm getCompressionAlgorithmByName(String compressName) {
public static Algorithm getCompressionAlgorithmByName(String compressName) {
Algorithm[] algos = Algorithm.class.getEnumConstants();
for (Algorithm a : algos) {

View File

@ -25,7 +25,7 @@ import java.io.OutputStream;
* A simplified BufferedOutputStream with borrowed buffer, and allow users to
* see how much data have been buffered.
*/
class SimpleBufferedOutputStream extends FilterOutputStream {
public class SimpleBufferedOutputStream extends FilterOutputStream {
protected byte buf[]; // the borrowed buffer
protected int count = 0; // bytes used in buffer.

View File

@ -62,9 +62,10 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
@ -411,10 +412,10 @@ public class LogsCLI extends Configured implements Tool {
return false;
}
private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
private List<Pair<ContainerLogFileInfo, String>> getContainerLogFiles(
Configuration conf, String containerIdStr, String nodeHttpAddress)
throws IOException {
List<Pair<PerContainerLogFileInfo, String>> logFileInfos
List<Pair<ContainerLogFileInfo, String>> logFileInfos
= new ArrayList<>();
Client webServiceClient = Client.create();
try {
@ -454,12 +455,12 @@ public class LogsCLI extends Configured implements Tool {
if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) {
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)), aggregateType));
}
} else if (ob instanceof JSONObject) {
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob), aggregateType));
}
@ -478,7 +479,7 @@ public class LogsCLI extends Configured implements Tool {
return logFileInfos;
}
private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
private ContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
JSONObject meta) throws JSONException {
String fileName = meta.has("fileName") ?
meta.getString("fileName") : "N/A";
@ -486,7 +487,7 @@ public class LogsCLI extends Configured implements Tool {
meta.getString("fileSize") : "N/A";
String lastModificationTime = meta.has("lastModifiedTime") ?
meta.getString("lastModifiedTime") : "N/A";
return new PerContainerLogFileInfo(fileName, fileSize,
return new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime);
}
@ -507,7 +508,7 @@ public class LogsCLI extends Configured implements Tool {
return -1;
}
String nodeId = request.getNodeId();
PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
PrintStream out = LogToolUtils.createPrintStream(localDir, nodeId,
containerIdStr);
try {
Set<String> matchedFiles = getMatchedContainerLogFiles(request,
@ -1236,9 +1237,9 @@ public class LogsCLI extends Configured implements Tool {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
outStream.println(StringUtils.repeat("=", containerString.length() * 2));
List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
List<Pair<ContainerLogFileInfo, String>> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress);
for (Pair<PerContainerLogFileInfo, String> info : infos) {
for (Pair<ContainerLogFileInfo, String> info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getKey().getFileName(), info.getKey().getFileSize(),
info.getKey().getLastModifiedTime(), info.getValue());
@ -1250,11 +1251,11 @@ public class LogsCLI extends Configured implements Tool {
boolean useRegex) throws IOException {
// fetch all the log files for the container
// filter the log files based on the given -log_files pattern
List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
List<Pair<ContainerLogFileInfo, String>> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>();
for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
for (Pair<ContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getKey().getFileName());
}
return getMatchedLogFiles(request, fileNames,

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.logaggregation;
/**
* PerContainerLogFileInfo represents the meta data for a container log file,
* ContainerLogFileInfo represents the meta data for a container log file,
* which includes:
* <ul>
* <li>The filename of the container log.</li>
@ -28,15 +28,15 @@ package org.apache.hadoop.yarn.logaggregation;
* </ul>
*
*/
public class PerContainerLogFileInfo {
public class ContainerLogFileInfo {
private String fileName;
private String fileSize;
private String lastModifiedTime;
//JAXB needs this
public PerContainerLogFileInfo() {}
public ContainerLogFileInfo() {}
public PerContainerLogFileInfo(String fileName, String fileSize,
public ContainerLogFileInfo(String fileName, String fileSize,
String lastModifiedTime) {
this.setFileName(fileName);
this.setFileSize(fileSize);
@ -83,10 +83,10 @@ public class PerContainerLogFileInfo {
if (otherObj == this) {
return true;
}
if (!(otherObj instanceof PerContainerLogFileInfo)) {
if (!(otherObj instanceof ContainerLogFileInfo)) {
return false;
}
PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
ContainerLogFileInfo other = (ContainerLogFileInfo)otherObj;
return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
&& other.lastModifiedTime.equals(lastModifiedTime);
}

View File

@ -26,14 +26,14 @@ import java.util.List;
* <ul>
* <li>The Container Id.</li>
* <li>The NodeManager Id.</li>
* <li>A list of {@link PerContainerLogFileInfo}.</li>
* <li>A list of {@link ContainerLogFileInfo}.</li>
* </ul>
*
*/
public class ContainerLogMeta {
private String containerId;
private String nodeId;
private List<PerContainerLogFileInfo> logMeta;
private List<ContainerLogFileInfo> logMeta;
public ContainerLogMeta(String containerId, String nodeId) {
this.containerId = containerId;
@ -51,11 +51,11 @@ public class ContainerLogMeta {
public void addLogMeta(String fileName, String fileSize,
String lastModificationTime) {
logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
logMeta.add(new ContainerLogFileInfo(fileName, fileSize,
lastModificationTime));
}
public List<PerContainerLogFileInfo> getContainerLogMeta() {
public List<ContainerLogFileInfo> getContainerLogMeta() {
return this.logMeta;
}
}

View File

@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Private
public class LogAggregationUtils {
@ -195,6 +198,30 @@ public class LogAggregationUtils {
return nodeFiles;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
* @param remoteRootLogDir the remote root log directory
* @param suffix the log directory suffix
* @return the list of available log files
* @throws IOException if there is no log file available
*/
public static List<FileStatus> getRemoteNodeFileList(
Configuration conf, ApplicationId appId, String appOwner,
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
List<FileStatus> nodeFiles = new ArrayList<>();
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
return nodeFiles;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration

View File

@ -22,8 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -229,7 +227,7 @@ public class LogCLIHelpers implements Configurable {
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime", "LogAggregationType");
out.println(StringUtils.repeat("=", containerString.length() * 2));
for (PerContainerLogFileInfo logMeta : containerLogMeta
for (ContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
@ -345,20 +343,6 @@ public class LogCLIHelpers implements Configurable {
+ ". Error message found: " + errorMessage);
}
@Private
public PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
public void closePrintStream(PrintStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
@ -379,7 +363,7 @@ public class LogCLIHelpers implements Configurable {
return logTypes;
}
for (ContainerLogMeta logMeta: containersLogMeta) {
for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
for (ContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
logTypes.add(fileInfo.getFileName());
}
}

View File

@ -21,11 +21,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
/**
* This class contains several utility function which could be used in different
@ -158,4 +162,26 @@ public final class LogToolUtils {
}
}
/**
* Create the container log file under given (local directory/nodeId) and
* return the PrintStream object.
* @param localDir the Local Dir
* @param nodeId the NodeId
* @param containerId the ContainerId
* @return the printStream object
* @throws IOException if an I/O error occurs
*/
public static PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
}

View File

@ -25,9 +25,6 @@ import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -37,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -91,6 +89,12 @@ public abstract class LogAggregationFileController {
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
/**
* Umask for the log file.
*/
protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0640 ^ 0777));
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
@ -98,6 +102,11 @@ public abstract class LogAggregationFileController {
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
= YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
@ -178,19 +187,6 @@ public abstract class LogAggregationFileController {
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
protected PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.cleanupWithLogger(LOG, out);
@ -481,4 +477,21 @@ public abstract class LogAggregationFileController {
LOG.error("Failed to clean old logs", e);
}
}
/**
* Create the aggregated log suffix. The LogAggregationFileController
* should call this to get the suffix and append the suffix to the end
* of each log. This would keep the aggregated log format consistent.
*
* @param fileName the File Name
* @return the aggregated log suffix String
*/
protected String aggregatedLogSuffix(String fileName) {
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileName;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
return sb.toString();
}
}

View File

@ -0,0 +1,275 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
import org.apache.hadoop.io.file.tfile.Compression;
import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedFileLogMeta;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedLogsMeta;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
/**
* The Aggregated Logs Block implementation for Indexed File.
*/
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
private final LogAggregationIndexedFileController fileController;
private final Configuration conf;
@Inject
public IndexedFileAggregatedLogsBlock(ViewContext ctx,
Configuration conf,
LogAggregationIndexedFileController fileController) {
super(ctx);
this.conf = conf;
this.fileController = fileController;
}
@Override
protected void render(Block html) {
BlockParameters params = verifyAndParseParameters(html);
if (params == null) {
return;
}
ApplicationId appId = params.getAppId();
ContainerId containerId = params.getContainerId();
NodeId nodeId = params.getNodeId();
String appOwner = params.getAppOwner();
String logEntity = params.getLogEntity();
long start = params.getStartIndex();
long end = params.getEndIndex();
List<FileStatus> nodeFiles = null;
try {
nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, appOwner,
this.fileController.getRemoteRootLogDir(),
this.fileController.getRemoteRootLogDirSuffix());
} catch(Exception ex) {
html.h1("Unable to locate any logs for container "
+ containerId.toString());
LOG.error(ex.getMessage());
return;
}
Map<String, FileStatus> checkSumFiles;
try {
checkSumFiles = fileController.filterFiles(nodeFiles,
LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
html.h1("Error getting logs for " + logEntity);
return;
}
List<FileStatus> fileToRead;
try {
fileToRead = fileController.getNodeLogFileToRead(nodeFiles,
nodeId.toString(), appId);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
html.h1("Error getting logs for " + logEntity);
return;
}
boolean foundLog = false;
String desiredLogType = $(CONTAINER_LOG_TYPE);
try {
for (FileStatus thisNodeFile : fileToRead) {
FileStatus checkSum = fileController.getAllChecksumFiles(
checkSumFiles, thisNodeFile.getPath().getName());
long endIndex = -1;
if (checkSum != null) {
endIndex = fileController.loadIndexedLogsCheckSum(
checkSum.getPath());
}
IndexedLogsMeta indexedLogsMeta = null;
try {
indexedLogsMeta = fileController.loadIndexedLogsMeta(
thisNodeFile.getPath(), endIndex);
} catch (Exception ex) {
// DO NOTHING
LOG.warn("Can not load log meta from the log file:"
+ thisNodeFile.getPath());
continue;
}
if (indexedLogsMeta == null) {
continue;
}
Map<ApplicationAccessType, String> appAcls = indexedLogsMeta.getAcls();
String user = indexedLogsMeta.getUser();
String remoteUser = request().getRemoteUser();
if (!checkAcls(conf, appId, user, appAcls, remoteUser)) {
html.h1()._("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity
+ " in log file [" + thisNodeFile.getPath().getName() + "]")
._();
LOG.error("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity);
continue;
}
String compressAlgo = indexedLogsMeta.getCompressName();
List<IndexedFileLogMeta> candidates = new ArrayList<>();
for (IndexedPerAggregationLogMeta logMeta
: indexedLogsMeta.getLogMetas()) {
for (Entry<String, List<IndexedFileLogMeta>> meta
: logMeta.getLogMetas().entrySet()) {
for (IndexedFileLogMeta log : meta.getValue()) {
if (!log.getContainerId().equals(containerId.toString())) {
continue;
}
if (desiredLogType != null && !desiredLogType.isEmpty()
&& !desiredLogType.equals(log.getFileName())) {
continue;
}
candidates.add(log);
}
}
}
if (candidates.isEmpty()) {
continue;
}
Algorithm compressName = Compression.getCompressionAlgorithmByName(
compressAlgo);
Decompressor decompressor = compressName.getDecompressor();
FileContext fileContext = FileContext.getFileContext(
thisNodeFile.getPath().toUri(), conf);
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
int bufferSize = 65536;
for (IndexedFileLogMeta candidate : candidates) {
byte[] cbuf = new byte[bufferSize];
InputStream in = null;
try {
in = compressName.createDecompressionStream(
new BoundedRangeFileInputStream(fsin,
candidate.getStartIndex(),
candidate.getFileCompressedSize()),
decompressor,
LogAggregationIndexedFileController.getFSInputBufferSize(
conf));
long logLength = candidate.getFileSize();
html.pre()._("\n\n")._();
html.p()._("Log Type: " + candidate.getFileName())._();
html.p()._("Log Upload Time: " + Times.format(
candidate.getLastModificatedTime()))._();
html.p()._("Log Length: " + Long.toString(
logLength))._();
long startIndex = start < 0
? logLength + start : start;
startIndex = startIndex < 0 ? 0 : startIndex;
startIndex = startIndex > logLength ? logLength : startIndex;
long endLogIndex = end < 0
? logLength + end : end;
endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
endLogIndex = endLogIndex < startIndex ?
startIndex : endLogIndex;
long toRead = endLogIndex - startIndex;
if (toRead < logLength) {
html.p()._("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME),
$(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
candidate.getFileName(), "?start=0"), "here").
_(" for the full log.")._();
}
long totalSkipped = 0;
while (totalSkipped < start) {
long ret = in.skip(start - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = in.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException("Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = in.read(cbuf, 0, currentToRead)) > 0) {
pre._(new String(cbuf, 0, len, Charset.forName("UTF-8")));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
foundLog = true;
} catch (Exception ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
} finally {
IOUtils.closeQuietly(in);
}
}
}
if (!foundLog) {
if (desiredLogType.isEmpty()) {
html.h1("No logs available for container " + containerId.toString());
} else {
html.h1("Unable to locate '" + desiredLogType
+ "' log for container " + containerId.toString());
}
}
} catch (RuntimeException e) {
throw e;
} catch (Exception ex) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, ex);
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -27,7 +27,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.util.Pair;
@ -192,7 +191,7 @@ public class LogAggregationTFileController
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
if (createPrintStream) {
os = createPrintStream(
os = LogToolUtils.createPrintStream(
logRequest.getOutputLocalDir(),
thisNodeFile.getPath().getName(), key.toString());
}
@ -209,12 +208,7 @@ public class LogAggregationTFileController
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf,
ContainerLogAggregationType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
byte[] b = sb.toString().getBytes(
byte[] b = aggregatedLogSuffix(fileType).getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Function test for {@link LogAggregationIndexFileController}.
*
*/
public class TestLogAggregationIndexFileController {
private final String rootLocalLogDir = "target/LocalLogs";
private final Path rootLocalLogDirPath = new Path(rootLocalLogDir);
private final String remoteLogDir = "target/remote-app";
private static final FsPermission LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0777));
private static final UserGroupInformation USER_UGI = UserGroupInformation
.createRemoteUser("testUser");
private FileSystem fs;
private Configuration conf;
private ApplicationId appId;
private ContainerId containerId;
private NodeId nodeId;
private ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
private ByteArrayOutputStream sysErrStream;
private PrintStream sysErr;
@Before
public void setUp() throws IOException {
appId = ApplicationId.newInstance(123456, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 1);
containerId = ContainerId.newContainerId(attemptId, 1);
nodeId = NodeId.newInstance("localhost", 9999);
conf = new Configuration();
conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
remoteLogDir);
conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
"logs");
conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
fs = FileSystem.get(conf);
sysOutStream = new ByteArrayOutputStream();
sysOut = new PrintStream(sysOutStream);
System.setOut(sysOut);
sysErrStream = new ByteArrayOutputStream();
sysErr = new PrintStream(sysErrStream);
System.setErr(sysErr);
}
@After
public void teardown() throws Exception {
fs.delete(rootLocalLogDirPath, true);
fs.delete(new Path(remoteLogDir), true);
}
@Test(timeout = 15000)
public void testLogAggregationIndexFileFormat() throws Exception {
if (fs.exists(rootLocalLogDirPath)) {
fs.delete(rootLocalLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLocalLogDirPath));
Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
List<String> logTypes = new ArrayList<String>();
logTypes.add("syslog");
logTypes.add("stdout");
logTypes.add("stderr");
Set<File> files = new HashSet<>();
LogKey key1 = new LogKey(containerId.toString());
for(String logType : logTypes) {
File file = createAndWriteLocalLogFile(containerId, appLogsDir,
logType);
files.add(file);
}
LogValue value = mock(LogValue.class);
when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
LogAggregationIndexedFileController fileFormat
= new LogAggregationIndexedFileController();
fileFormat.initialize(conf, "Indexed");
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
Path appDir = fileFormat.getRemoteAppLogDir(appId,
USER_UGI.getShortUserName());
if (fs.exists(appDir)) {
fs.delete(appDir, true);
}
assertTrue(fs.mkdirs(appDir));
Path logPath = fileFormat.getRemoteNodeLogFileForApp(
appId, USER_UGI.getShortUserName(), nodeId);
LogAggregationFileControllerContext context =
new LogAggregationFileControllerContext(
logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
// initialize the writer
fileFormat.initializeWriter(context);
fileFormat.write(key1, value);
LogAggregationFileControllerContext record = mock(
LogAggregationFileControllerContext.class);
fileFormat.postWrite(record);
fileFormat.closeWriter();
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setNodeId(nodeId.toString());
logRequest.setAppOwner(USER_UGI.getShortUserName());
logRequest.setContainerId(containerId.toString());
logRequest.setBytes(Long.MAX_VALUE);
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertTrue(meta.size() == 1);
List<String> fileNames = new ArrayList<>();
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
Assert.assertTrue(log.getContainerLogMeta().size() == 3);
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(logTypes);
Assert.assertTrue(fileNames.isEmpty());
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : logTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
// create a checksum file
Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
appId, USER_UGI.getShortUserName()),
LogAggregationUtils.getNodeString(nodeId)
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
FSDataOutputStream fInput = null;
try {
fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
fInput.writeLong(0);
} finally {
IOUtils.closeQuietly(fInput);
}
meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertTrue(meta.size() == 0);
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertFalse(foundLogs);
sysOutStream.reset();
fs.delete(checksumFile, false);
Assert.assertFalse(fs.exists(checksumFile));
List<String> newLogTypes = new ArrayList<>(logTypes);
files.clear();
newLogTypes.add("test1");
files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
"test1"));
newLogTypes.add("test2");
files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
"test2"));
LogValue value2 = mock(LogValue.class);
when(value2.getPendingLogFilesToUploadForThisContainer())
.thenReturn(files);
// initialize the writer
fileFormat.initializeWriter(context);
fileFormat.write(key1, value2);
fileFormat.closeWriter();
// We did not call postWriter which we would keep the checksum file.
// We can only get the logs/logmeta from the first write.
fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), meta.size(), 1);
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
Assert.assertTrue(log.getContainerLogMeta().size() == 3);
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(logTypes);
Assert.assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : logTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
Assert.assertFalse(sysOutStream.toString().contains(logMessage(
containerId, "test1")));
Assert.assertFalse(sysOutStream.toString().contains(logMessage(
containerId, "test2")));
sysOutStream.reset();
// Call postWrite and we should get all logs/logmetas for both
// first write and second write
fileFormat.initializeWriter(context);
fileFormat.write(key1, value2);
fileFormat.postWrite(record);
fileFormat.closeWriter();
fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), meta.size(), 2);
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(newLogTypes);
Assert.assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : newLogTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
}
private File createAndWriteLocalLogFile(ContainerId containerId,
Path localLogDir, String logType) throws IOException {
File file = new File(localLogDir.toString(), logType);
if (file.exists()) {
file.delete();
}
file.createNewFile();
Writer writer = null;
try {
writer = new FileWriter(file);
writer.write(logMessage(containerId, logType));
writer.close();
return file;
} finally {
IOUtils.closeQuietly(writer);
}
}
private String logMessage(ContainerId containerId, String logType) {
StringBuilder sb = new StringBuilder();
sb.append("Hello " + containerId + " in " + logType + "!");
return sb.toString();
}
}

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
@ -847,7 +847,7 @@ public class TestAHSWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
@ -875,7 +875,7 @@ public class TestAHSWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
List<ContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
@ -913,7 +913,7 @@ public class TestAHSWebServices extends JerseyTestBase {
assertTrue(responseText.size() == 1);
assertEquals(responseText.get(0).getLogType(),
ContainerLogAggregationType.AGGREGATED.toString());
List<PerContainerLogFileInfo> logMeta = responseText.get(0)
List<ContainerLogFileInfo> logMeta = responseText.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);

View File

@ -27,14 +27,14 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
/**
* {@code ContainerLogsInfo} includes the log meta-data of containers.
* <p>
* The container log meta-data includes details such as:
* <ul>
* <li>A list of {@link PerContainerLogFileInfo}.</li>
* <li>A list of {@link ContainerLogFileInfo}.</li>
* <li>The container Id.</li>
* <li>The NodeManager Id.</li>
* <li>The logType: could be local or aggregated</li>
@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
public class ContainerLogsInfo {
@XmlElement(name = "containerLogInfo")
protected List<PerContainerLogFileInfo> containerLogsInfo;
protected List<ContainerLogFileInfo> containerLogsInfo;
@XmlElement(name = "logAggregationType")
protected String logType;
@ -62,14 +62,14 @@ public class ContainerLogsInfo {
public ContainerLogsInfo(ContainerLogMeta logMeta,
ContainerLogAggregationType logType) throws YarnException {
this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
this.containerLogsInfo = new ArrayList<ContainerLogFileInfo>(
logMeta.getContainerLogMeta());
this.logType = logType.toString();
this.containerId = logMeta.getContainerId();
this.nodeId = logMeta.getNodeId();
}
public List<PerContainerLogFileInfo> getContainerLogsInfo() {
public List<ContainerLogFileInfo> getContainerLogsInfo() {
return this.containerLogsInfo;
}

View File

@ -27,7 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
@ -55,10 +55,10 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
containerId, remoteUser, nmContext);
}
private static List<PerContainerLogFileInfo> getContainerLogsInfo(
private static List<ContainerLogFileInfo> getContainerLogsInfo(
ContainerId id, String remoteUser, Context nmContext)
throws YarnException {
List<PerContainerLogFileInfo> logFiles = new ArrayList<>();
List<ContainerLogFileInfo> logFiles = new ArrayList<>();
List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
id, remoteUser, nmContext);
for (File containerLogsDir : logDirs) {
@ -66,7 +66,7 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
if (logs != null) {
for (File log : logs) {
if (log.isFile()) {
PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo(
ContainerLogFileInfo logMeta = new ContainerLogFileInfo(
log.getName(), Long.toString(log.length()),
Times.format(log.lastModified()));
logFiles.add(logMeta);

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -529,7 +529,7 @@ public class TestNMWebServices extends JerseyTestBase {
assertTrue(responseList.size() == 1);
assertEquals(responseList.get(0).getLogType(),
ContainerLogAggregationType.LOCAL.toString());
List<PerContainerLogFileInfo> logMeta = responseList.get(0)
List<ContainerLogFileInfo> logMeta = responseList.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), filename);
@ -556,13 +556,13 @@ public class TestNMWebServices extends JerseyTestBase {
for (ContainerLogsInfo logInfo : responseList) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
} else {
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), filename);
}