diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java index 5fbd508c58d..5a000d3e85b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BoundedRangeFileInputStream.java @@ -28,7 +28,7 @@ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they * would not interfere with each other. */ -class BoundedRangeFileInputStream extends InputStream { +public class BoundedRangeFileInputStream extends InputStream { private FSDataInputStream in; private long pos; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java index 2298dc001ec..260ee4b589a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Compression.java @@ -43,7 +43,7 @@ /** * Compression related stuff. */ -final class Compression { +public final class Compression { static final Logger LOG = LoggerFactory.getLogger(Compression.class); /** @@ -75,7 +75,7 @@ public void flush() throws IOException { /** * Compression algorithms. */ - static enum Algorithm { + public static enum Algorithm { LZO(TFile.COMPRESSION_LZO) { private transient boolean checked = false; private static final String defaultClazz = @@ -348,7 +348,7 @@ public String getName() { } } - static Algorithm getCompressionAlgorithmByName(String compressName) { + public static Algorithm getCompressionAlgorithmByName(String compressName) { Algorithm[] algos = Algorithm.class.getEnumConstants(); for (Algorithm a : algos) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java index a26a02d5769..0a194a3ce60 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/SimpleBufferedOutputStream.java @@ -25,7 +25,7 @@ * A simplified BufferedOutputStream with borrowed buffer, and allow users to * see how much data have been buffered. */ -class SimpleBufferedOutputStream extends FilterOutputStream { +public class SimpleBufferedOutputStream extends FilterOutputStream { protected byte buf[]; // the borrowed buffer protected int count = 0; // bytes used in buffer. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index c5e03083f5b..f2d9837dcf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -62,9 +62,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; -import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; import org.codehaus.jettison.json.JSONArray; @@ -411,10 +412,10 @@ private boolean fetchAllLogFiles(String[] logFiles, String[] logFilesRegex) { return false; } - private List> getContainerLogFiles( + private List> getContainerLogFiles( Configuration conf, String containerIdStr, String nodeHttpAddress) throws IOException { - List> logFileInfos + List> logFileInfos = new ArrayList<>(); Client webServiceClient = Client.create(); try { @@ -454,12 +455,12 @@ private List> getContainerLogFiles( if (ob instanceof JSONArray) { JSONArray obArray = (JSONArray)ob; for (int j = 0; j < obArray.length(); j++) { - logFileInfos.add(new Pair( + logFileInfos.add(new Pair( generatePerContainerLogFileInfoFromJSON( obArray.getJSONObject(j)), aggregateType)); } } else if (ob instanceof JSONObject) { - logFileInfos.add(new Pair( + logFileInfos.add(new Pair( generatePerContainerLogFileInfoFromJSON( (JSONObject)ob), aggregateType)); } @@ -478,7 +479,7 @@ private List> getContainerLogFiles( return logFileInfos; } - private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON( + private ContainerLogFileInfo generatePerContainerLogFileInfoFromJSON( JSONObject meta) throws JSONException { String fileName = meta.has("fileName") ? meta.getString("fileName") : "N/A"; @@ -486,7 +487,7 @@ private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON( meta.getString("fileSize") : "N/A"; String lastModificationTime = meta.has("lastModifiedTime") ? meta.getString("lastModifiedTime") : "N/A"; - return new PerContainerLogFileInfo(fileName, fileSize, + return new ContainerLogFileInfo(fileName, fileSize, lastModificationTime); } @@ -507,7 +508,7 @@ public int printContainerLogsFromRunningApplication(Configuration conf, return -1; } String nodeId = request.getNodeId(); - PrintStream out = logCliHelper.createPrintStream(localDir, nodeId, + PrintStream out = LogToolUtils.createPrintStream(localDir, nodeId, containerIdStr); try { Set matchedFiles = getMatchedContainerLogFiles(request, @@ -1236,9 +1237,9 @@ private void outputContainerLogMeta(String containerId, String nodeId, outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength", "LastModificationTime", "LogAggregationType"); outStream.println(StringUtils.repeat("=", containerString.length() * 2)); - List> infos = getContainerLogFiles( + List> infos = getContainerLogFiles( getConf(), containerId, nodeHttpAddress); - for (Pair info : infos) { + for (Pair info : infos) { outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, info.getKey().getFileName(), info.getKey().getFileSize(), info.getKey().getLastModifiedTime(), info.getValue()); @@ -1250,11 +1251,11 @@ public Set getMatchedContainerLogFiles(ContainerLogsRequest request, boolean useRegex) throws IOException { // fetch all the log files for the container // filter the log files based on the given -log_files pattern - List> allLogFileInfos= + List> allLogFileInfos= getContainerLogFiles(getConf(), request.getContainerId(), request.getNodeHttpAddress()); List fileNames = new ArrayList(); - for (Pair fileInfo : allLogFileInfos) { + for (Pair fileInfo : allLogFileInfos) { fileNames.add(fileInfo.getKey().getFileName()); } return getMatchedLogFiles(request, fileNames, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java similarity index 87% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java index 867815fd6ca..b461ebbdf24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/PerContainerLogFileInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogFileInfo.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.logaggregation; /** - * PerContainerLogFileInfo represents the meta data for a container log file, + * ContainerLogFileInfo represents the meta data for a container log file, * which includes: *
    *
  • The filename of the container log.
  • @@ -28,15 +28,15 @@ *
* */ -public class PerContainerLogFileInfo { +public class ContainerLogFileInfo { private String fileName; private String fileSize; private String lastModifiedTime; //JAXB needs this - public PerContainerLogFileInfo() {} + public ContainerLogFileInfo() {} - public PerContainerLogFileInfo(String fileName, String fileSize, + public ContainerLogFileInfo(String fileName, String fileSize, String lastModifiedTime) { this.setFileName(fileName); this.setFileSize(fileSize); @@ -83,10 +83,10 @@ public boolean equals(Object otherObj) { if (otherObj == this) { return true; } - if (!(otherObj instanceof PerContainerLogFileInfo)) { + if (!(otherObj instanceof ContainerLogFileInfo)) { return false; } - PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj; + ContainerLogFileInfo other = (ContainerLogFileInfo)otherObj; return other.fileName.equals(fileName) && other.fileSize.equals(fileSize) && other.lastModifiedTime.equals(lastModifiedTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java index 26a620e8c99..4c6b0de16fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogMeta.java @@ -26,14 +26,14 @@ *
    *
  • The Container Id.
  • *
  • The NodeManager Id.
  • - *
  • A list of {@link PerContainerLogFileInfo}.
  • + *
  • A list of {@link ContainerLogFileInfo}.
  • *
* */ public class ContainerLogMeta { private String containerId; private String nodeId; - private List logMeta; + private List logMeta; public ContainerLogMeta(String containerId, String nodeId) { this.containerId = containerId; @@ -51,11 +51,11 @@ public String getContainerId() { public void addLogMeta(String fileName, String fileSize, String lastModificationTime) { - logMeta.add(new PerContainerLogFileInfo(fileName, fileSize, + logMeta.add(new ContainerLogFileInfo(fileName, fileSize, lastModificationTime)); } - public List getContainerLogMeta() { + public List getContainerLogMeta() { return this.logMeta; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 6d04c29406a..edf2cf3fbcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -30,6 +30,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; @Private public class LogAggregationUtils { @@ -195,6 +198,30 @@ public static RemoteIterator getRemoteNodeFileDir( return nodeFiles; } + /** + * Get all available log files under remote app log directory. + * @param conf the configuration + * @param appId the applicationId + * @param appOwner the application owner + * @param remoteRootLogDir the remote root log directory + * @param suffix the log directory suffix + * @return the list of available log files + * @throws IOException if there is no log file available + */ + public static List getRemoteNodeFileList( + Configuration conf, ApplicationId appId, String appOwner, + org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) + throws IOException { + Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, + remoteRootLogDir, suffix); + List nodeFiles = new ArrayList<>(); + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + nodeFiles.addAll(Arrays.asList(FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir))); + return nodeFiles; + } + /** * Get all available log files under remote app log directory. * @param conf the configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 0068eaeeb9a..97b78ec7c21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.file.AccessDeniedException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -229,7 +227,7 @@ public int printAContainerLogMetadata(ContainerLogsRequest options, out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength", "LastModificationTime", "LogAggregationType"); out.println(StringUtils.repeat("=", containerString.length() * 2)); - for (PerContainerLogFileInfo logMeta : containerLogMeta + for (ContainerLogFileInfo logMeta : containerLogMeta .getContainerLogMeta()) { out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(), logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED"); @@ -345,20 +343,6 @@ private static void logDirNoAccessPermission(String remoteAppLogDir, + ". Error message found: " + errorMessage); } - @Private - public PrintStream createPrintStream(String localDir, String nodeId, - String containerId) throws IOException { - PrintStream out = System.out; - if(localDir != null && !localDir.isEmpty()) { - Path nodePath = new Path(localDir, LogAggregationUtils - .getNodeString(nodeId)); - Files.createDirectories(Paths.get(nodePath.toString())); - Path containerLogPath = new Path(nodePath, containerId); - out = new PrintStream(containerLogPath.toString(), "UTF-8"); - } - return out; - } - public void closePrintStream(PrintStream out) { if (out != System.out) { IOUtils.closeQuietly(out); @@ -379,7 +363,7 @@ public Set listContainerLogs(ContainerLogsRequest options) return logTypes; } for (ContainerLogMeta logMeta: containersLogMeta) { - for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) { + for (ContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) { logTypes.add(fileInfo.getFileName()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index 0416566a32c..82bb2ecbac1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -21,11 +21,15 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PrintStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; /** * This class contains several utility function which could be used in different @@ -158,4 +162,26 @@ public static void outputContainerLogThroughZeroCopy(String containerId, } } + + /** + * Create the container log file under given (local directory/nodeId) and + * return the PrintStream object. + * @param localDir the Local Dir + * @param nodeId the NodeId + * @param containerId the ContainerId + * @return the printStream object + * @throws IOException if an I/O error occurs + */ + public static PrintStream createPrintStream(String localDir, String nodeId, + String containerId) throws IOException { + PrintStream out = System.out; + if(localDir != null && !localDir.isEmpty()) { + Path nodePath = new Path(localDir, LogAggregationUtils + .getNodeString(nodeId)); + Files.createDirectories(Paths.get(nodePath.toString())); + Path containerLogPath = new Path(nodePath, containerId); + out = new PrintStream(containerLogPath.toString(), "UTF-8"); + } + return out; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 39f3dc339f5..5df900b9f64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -25,9 +25,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; -import java.io.PrintStream; -import java.nio.file.Files; -import java.nio.file.Paths; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -37,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -91,6 +89,12 @@ public abstract class LogAggregationFileController { protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission .createImmutable((short) 0770); + /** + * Umask for the log file. + */ + protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + // This is temporary solution. The configuration will be deleted once // we find a more scalable method to only write a single log file per LRS. private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP @@ -98,6 +102,11 @@ public abstract class LogAggregationFileController { private static final int DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; + // This is temporary solution. The configuration will be deleted once we have + // the FileSystem API to check whether append operation is supported or not. + public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND + = YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append"; + protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; @@ -178,19 +187,6 @@ public abstract void write(LogKey logKey, LogValue logValue) public abstract void postWrite(LogAggregationFileControllerContext record) throws Exception; - protected PrintStream createPrintStream(String localDir, String nodeId, - String containerId) throws IOException { - PrintStream out = System.out; - if(localDir != null && !localDir.isEmpty()) { - Path nodePath = new Path(localDir, LogAggregationUtils - .getNodeString(nodeId)); - Files.createDirectories(Paths.get(nodePath.toString())); - Path containerLogPath = new Path(nodePath, containerId); - out = new PrintStream(containerLogPath.toString(), "UTF-8"); - } - return out; - } - protected void closePrintStream(OutputStream out) { if (out != System.out) { IOUtils.cleanupWithLogger(LOG, out); @@ -481,4 +477,21 @@ public Object run() throws Exception { LOG.error("Failed to clean old logs", e); } } + + /** + * Create the aggregated log suffix. The LogAggregationFileController + * should call this to get the suffix and append the suffix to the end + * of each log. This would keep the aggregated log format consistent. + * + * @param fileName the File Name + * @return the aggregated log suffix String + */ + protected String aggregatedLogSuffix(String fileName) { + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:" + fileName; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + return sb.toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java new file mode 100644 index 00000000000..7f4441d8c6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; + +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; + +import com.google.inject.Inject; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream; +import org.apache.hadoop.io.file.tfile.Compression; +import org.apache.hadoop.io.file.tfile.Compression.Algorithm; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock; +import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedFileLogMeta; +import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedLogsMeta; +import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE; + +/** + * The Aggregated Logs Block implementation for Indexed File. + */ +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock { + + private final LogAggregationIndexedFileController fileController; + private final Configuration conf; + + @Inject + public IndexedFileAggregatedLogsBlock(ViewContext ctx, + Configuration conf, + LogAggregationIndexedFileController fileController) { + super(ctx); + this.conf = conf; + this.fileController = fileController; + } + + @Override + protected void render(Block html) { + BlockParameters params = verifyAndParseParameters(html); + if (params == null) { + return; + } + + ApplicationId appId = params.getAppId(); + ContainerId containerId = params.getContainerId(); + NodeId nodeId = params.getNodeId(); + String appOwner = params.getAppOwner(); + String logEntity = params.getLogEntity(); + long start = params.getStartIndex(); + long end = params.getEndIndex(); + + List nodeFiles = null; + try { + nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, appOwner, + this.fileController.getRemoteRootLogDir(), + this.fileController.getRemoteRootLogDirSuffix()); + } catch(Exception ex) { + html.h1("Unable to locate any logs for container " + + containerId.toString()); + LOG.error(ex.getMessage()); + return; + } + + Map checkSumFiles; + try { + checkSumFiles = fileController.filterFiles(nodeFiles, + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + html.h1("Error getting logs for " + logEntity); + return; + } + + List fileToRead; + try { + fileToRead = fileController.getNodeLogFileToRead(nodeFiles, + nodeId.toString(), appId); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + html.h1("Error getting logs for " + logEntity); + return; + } + + boolean foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + for (FileStatus thisNodeFile : fileToRead) { + FileStatus checkSum = fileController.getAllChecksumFiles( + checkSumFiles, thisNodeFile.getPath().getName()); + long endIndex = -1; + if (checkSum != null) { + endIndex = fileController.loadIndexedLogsCheckSum( + checkSum.getPath()); + } + IndexedLogsMeta indexedLogsMeta = null; + try { + indexedLogsMeta = fileController.loadIndexedLogsMeta( + thisNodeFile.getPath(), endIndex); + } catch (Exception ex) { + // DO NOTHING + LOG.warn("Can not load log meta from the log file:" + + thisNodeFile.getPath()); + continue; + } + if (indexedLogsMeta == null) { + continue; + } + Map appAcls = indexedLogsMeta.getAcls(); + String user = indexedLogsMeta.getUser(); + String remoteUser = request().getRemoteUser(); + if (!checkAcls(conf, appId, user, appAcls, remoteUser)) { + html.h1()._("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + thisNodeFile.getPath().getName() + "]") + ._(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + continue; + } + String compressAlgo = indexedLogsMeta.getCompressName(); + List candidates = new ArrayList<>(); + for (IndexedPerAggregationLogMeta logMeta + : indexedLogsMeta.getLogMetas()) { + for (Entry> meta + : logMeta.getLogMetas().entrySet()) { + for (IndexedFileLogMeta log : meta.getValue()) { + if (!log.getContainerId().equals(containerId.toString())) { + continue; + } + if (desiredLogType != null && !desiredLogType.isEmpty() + && !desiredLogType.equals(log.getFileName())) { + continue; + } + candidates.add(log); + } + } + } + if (candidates.isEmpty()) { + continue; + } + + Algorithm compressName = Compression.getCompressionAlgorithmByName( + compressAlgo); + Decompressor decompressor = compressName.getDecompressor(); + FileContext fileContext = FileContext.getFileContext( + thisNodeFile.getPath().toUri(), conf); + FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); + int bufferSize = 65536; + for (IndexedFileLogMeta candidate : candidates) { + byte[] cbuf = new byte[bufferSize]; + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, + candidate.getStartIndex(), + candidate.getFileCompressedSize()), + decompressor, + LogAggregationIndexedFileController.getFSInputBufferSize( + conf)); + long logLength = candidate.getFileSize(); + html.pre()._("\n\n")._(); + html.p()._("Log Type: " + candidate.getFileName())._(); + html.p()._("Log Upload Time: " + Times.format( + candidate.getLastModificatedTime()))._(); + html.p()._("Log Length: " + Long.toString( + logLength))._(); + long startIndex = start < 0 + ? logLength + start : start; + startIndex = startIndex < 0 ? 0 : startIndex; + startIndex = startIndex > logLength ? logLength : startIndex; + long endLogIndex = end < 0 + ? logLength + end : end; + endLogIndex = endLogIndex < 0 ? 0 : endLogIndex; + endLogIndex = endLogIndex > logLength ? logLength : endLogIndex; + endLogIndex = endLogIndex < startIndex ? + startIndex : endLogIndex; + long toRead = endLogIndex - startIndex; + if (toRead < logLength) { + html.p()._("Showing " + toRead + " bytes of " + logLength + + " total. Click ").a(url("logs", $(NM_NODENAME), + $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER), + candidate.getFileName(), "?start=0"), "here"). + _(" for the full log.")._(); + } + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = in.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = in.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException("Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = in.read(cbuf, 0, currentToRead)) > 0) { + pre._(new String(cbuf, 0, len, Charset.forName("UTF-8"))); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre._(); + foundLog = true; + } catch (Exception ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + IOUtils.closeQuietly(in); + } + } + } + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + } + } catch (RuntimeException e) { + throw e; + } catch (Exception ex) { + html.h1()._("Error getting logs for " + logEntity)._(); + LOG.error("Error getting logs for " + logEntity, ex); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java new file mode 100644 index 00000000000..6cb206288c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -0,0 +1,1056 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang.SerializationUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream; +import org.apache.hadoop.io.file.tfile.Compression; +import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream; +import org.apache.hadoop.io.file.tfile.Compression.Algorithm; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Indexed Log Aggregation File Format implementation. + * + */ +@Private +@Unstable +public class LogAggregationIndexedFileController + extends LogAggregationFileController { + + private static final Logger LOG = LoggerFactory.getLogger( + LogAggregationIndexedFileController.class); + private static final String FS_OUTPUT_BUF_SIZE_ATTR = + "indexedFile.fs.output.buffer.size"; + private static final String FS_INPUT_BUF_SIZE_ATTR = + "indexedFile.fs.input.buffer.size"; + private static final String FS_NUM_RETRIES_ATTR = + "indexedFile.fs.op.num-retries"; + private static final String FS_RETRY_INTERVAL_MS_ATTR = + "indexedFile.fs.retry-interval-ms"; + private static final int UUID_LENGTH = 36; + + @VisibleForTesting + public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; + + private int fsNumRetries = 3; + private long fsRetryInterval = 1000L; + private static final int VERSION = 1; + private IndexedLogsMeta indexedLogsMeta = null; + private IndexedPerAggregationLogMeta logsMetaInThisCycle; + private long logAggregationTimeInThisCycle; + private FSDataOutputStream fsDataOStream; + private Algorithm compressAlgo; + private CachedIndexedLogsMeta cachedIndexedLogsMeta = null; + private boolean logAggregationSuccessfullyInThisCyCle = false; + private long currentOffSet = 0; + private Path remoteLogCheckSumFile; + private FileContext fc; + private UserGroupInformation ugi; + private String uuid = null; + + public LogAggregationIndexedFileController() {} + + @Override + public void initInternal(Configuration conf) { + // Currently, we need the underlying File System to support append + // operation. Will remove this check after we finish + // LogAggregationIndexedFileController for non-append mode. + boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true); + if (!append) { + throw new YarnRuntimeException("The configuration:" + + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only" + + " use LogAggregationIndexedFileController when the FileSystem " + + "support append operations."); + } + String remoteDirStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + this.fileControllerName); + String remoteDir = conf.get(remoteDirStr); + if (remoteDir == null || remoteDir.isEmpty()) { + remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR); + } + this.remoteRootLogDir = new Path(remoteDir); + String suffix = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + this.fileControllerName); + this.remoteRootLogDirSuffix = conf.get(suffix); + if (this.remoteRootLogDirSuffix == null + || this.remoteRootLogDirSuffix.isEmpty()) { + this.remoteRootLogDirSuffix = conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX) + + "-ifile"; + } + String compressName = conf.get( + YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); + this.compressAlgo = Compression.getCompressionAlgorithmByName( + compressName); + this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3); + this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L); + } + + @Override + public void initializeWriter( + final LogAggregationFileControllerContext context) + throws IOException { + final UserGroupInformation userUgi = context.getUserUgi(); + final Map appAcls = context.getAppAcls(); + final String nodeId = context.getNodeId().toString(); + final Path remoteLogFile = context.getRemoteNodeLogFileForApp(); + this.ugi = userUgi; + logAggregationSuccessfullyInThisCyCle = false; + logsMetaInThisCycle = new IndexedPerAggregationLogMeta(); + logAggregationTimeInThisCycle = System.currentTimeMillis(); + logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle); + logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName()); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + fc = FileContext.getFileContext( + remoteRootLogDir.toUri(), conf); + fc.setUMask(APP_LOG_FILE_UMASK); + boolean fileExist = fc.util().exists(remoteLogFile); + if (fileExist && context.isLogAggregationInRolling()) { + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.APPEND), + new Options.CreateOpts[] {}); + if (uuid == null) { + FSDataInputStream fsDataInputStream = null; + try { + fsDataInputStream = fc.open(remoteLogFile); + byte[] b = new byte[UUID_LENGTH]; + int actual = fsDataInputStream.read(b); + if (actual != UUID_LENGTH) { + // Get an error when parse the UUID from existed log file. + // Simply OverWrite the existed log file and re-create the + // UUID. + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + uuid = UUID.randomUUID().toString(); + fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8"))); + fsDataOStream.flush(); + } else { + uuid = new String(b, Charset.forName("UTF-8")); + } + } finally { + IOUtils.cleanupWithLogger(LOG, fsDataInputStream); + } + } + // if the remote log file exists, but we do not have any + // indexedLogsMeta. We need to re-load indexedLogsMeta from + // the existing remote log file. If the re-load fails, we simply + // re-create a new indexedLogsMeta object. And will re-load + // the indexedLogsMeta from checksum file later. + if (indexedLogsMeta == null) { + try { + indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile); + } catch (IOException ex) { + // DO NOTHING + } + } + } else { + fsDataOStream = fc.create(remoteLogFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + if (uuid == null) { + uuid = UUID.randomUUID().toString(); + } + byte[] b = uuid.getBytes(Charset.forName("UTF-8")); + fsDataOStream.write(b); + fsDataOStream.flush(); + } + if (indexedLogsMeta == null) { + indexedLogsMeta = new IndexedLogsMeta(); + indexedLogsMeta.setVersion(VERSION); + indexedLogsMeta.setUser(userUgi.getShortUserName()); + indexedLogsMeta.setAcls(appAcls); + indexedLogsMeta.setNodeId(nodeId); + String compressName = conf.get( + YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); + indexedLogsMeta.setCompressName(compressName); + } + final long currentAggregatedLogFileLength = fc + .getFileStatus(remoteLogFile).getLen(); + // only check the check-sum file when we are in append mode + if (context.isLogAggregationInRolling()) { + // check whether the checksum file exists to figure out + // whether the previous log aggregation process is successful + // and the aggregated log file is corrupted or not. + remoteLogCheckSumFile = new Path(remoteLogFile.getParent(), + (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX)); + boolean exist = fc.util().exists(remoteLogCheckSumFile); + if (!exist) { + FSDataOutputStream checksumFileOutputStream = null; + try { + checksumFileOutputStream = fc.create(remoteLogCheckSumFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + new Options.CreateOpts[] {}); + checksumFileOutputStream.writeLong( + currentAggregatedLogFileLength); + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); + } + } else { + FSDataInputStream checksumFileInputStream = null; + try { + checksumFileInputStream = fc.open(remoteLogCheckSumFile); + long endIndex = checksumFileInputStream.readLong(); + IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta( + remoteLogFile, endIndex); + if (recoveredLogsMeta == null) { + indexedLogsMeta.getLogMetas().clear(); + } else { + indexedLogsMeta = recoveredLogsMeta; + } + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); + } + } + } + // append a simple character("\n") to move the writer cursor, so + // we could get the correct position when we call + // fsOutputStream.getStartPos() + final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); + fsDataOStream.write(dummyBytes); + fsDataOStream.flush(); + + if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength + + dummyBytes.length)) { + currentOffSet = 0; + } else { + currentOffSet = currentAggregatedLogFileLength; + } + return null; + } + }); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void closeWriter() { + IOUtils.cleanupWithLogger(LOG, this.fsDataOStream); + } + + @Override + public void write(LogKey logKey, LogValue logValue) throws IOException { + String containerId = logKey.toString(); + Set pendingUploadFiles = logValue + .getPendingLogFilesToUploadForThisContainer(); + List metas = new ArrayList<>(); + for (File logFile : pendingUploadFiles) { + FileInputStream in = null; + try { + in = SecureIOUtils.openForRead(logFile, logValue.getUser(), null); + } catch (IOException e) { + logErrorMessage(logFile, e); + IOUtils.cleanupWithLogger(LOG, in); + continue; + } + final long fileLength = logFile.length(); + IndexedFileOutputStreamState outputStreamState = null; + try { + outputStreamState = new IndexedFileOutputStreamState( + this.compressAlgo, this.fsDataOStream, conf, this.currentOffSet); + byte[] buf = new byte[65535]; + int len = 0; + long bytesLeft = fileLength; + while ((len = in.read(buf)) != -1) { + //If buffer contents within fileLength, write + if (len < bytesLeft) { + outputStreamState.getOutputStream().write(buf, 0, len); + bytesLeft-=len; + } else { + //else only write contents within fileLength, then exit early + outputStreamState.getOutputStream().write(buf, 0, + (int)bytesLeft); + break; + } + } + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated logs truncated by approximately "+ + (newLength-fileLength) +" bytes."); + } + logAggregationSuccessfullyInThisCyCle = true; + } catch (IOException e) { + String message = logErrorMessage(logFile, e); + if (outputStreamState != null && + outputStreamState.getOutputStream() != null) { + outputStreamState.getOutputStream().write( + message.getBytes(Charset.forName("UTF-8"))); + } + } finally { + IOUtils.cleanupWithLogger(LOG, in); + } + + IndexedFileLogMeta meta = new IndexedFileLogMeta(); + meta.setContainerId(containerId.toString()); + meta.setFileName(logFile.getName()); + if (outputStreamState != null) { + outputStreamState.finish(); + meta.setFileCompressedSize(outputStreamState.getCompressedSize()); + meta.setStartIndex(outputStreamState.getStartPos()); + meta.setFileSize(fileLength); + } + meta.setLastModificatedTime(logFile.lastModified()); + metas.add(meta); + } + logsMetaInThisCycle.addContainerLogMeta(containerId, metas); + } + + @Override + public void postWrite(LogAggregationFileControllerContext record) + throws Exception { + // always aggregate the previous logsMeta, and append them together + // at the end of the file + indexedLogsMeta.addLogMeta(logsMetaInThisCycle); + byte[] b = SerializationUtils.serialize(indexedLogsMeta); + this.fsDataOStream.write(b); + int length = b.length; + this.fsDataOStream.writeInt(length); + byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8")); + this.fsDataOStream.write(separator); + if (logAggregationSuccessfullyInThisCyCle) { + deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile); + } + } + + private void deleteFileWithRetries(final FileContext fileContext, + final UserGroupInformation userUgi, + final Path deletePath) throws Exception { + new FSAction() { + @Override + public Void run() throws Exception { + deleteFileWithPrivilege(fileContext, userUgi, deletePath); + return null; + } + }.runWithRetries(); + } + + private Object deleteFileWithPrivilege(final FileContext fileContext, + final UserGroupInformation userUgi, final Path fileToDelete) + throws Exception { + return userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + if (fileContext.util().exists(fileToDelete)) { + fileContext.delete(fileToDelete, false); + } + return null; + } + }); + } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + boolean findLogs = false; + boolean createPrintStream = (os == null); + ApplicationId appId = logRequest.getAppId(); + String nodeId = logRequest.getNodeId(); + String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null + : LogAggregationUtils.getNodeString(nodeId); + List logTypes = new ArrayList<>(); + if (logRequest.getLogTypes() != null && !logRequest + .getLogTypes().isEmpty()) { + logTypes.addAll(logRequest.getLogTypes()); + } + String containerIdStr = logRequest.getContainerId(); + boolean getAllContainers = (containerIdStr == null + || containerIdStr.isEmpty()); + long size = logRequest.getBytes(); + List nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + if (nodeFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = filterFiles( + nodeFiles, CHECK_SUM_FILE_SUFFIX); + List fileToRead = getNodeLogFileToRead( + nodeFiles, nodeIdStr, appId); + byte[] buf = new byte[65535]; + for (FileStatus thisNodeFile : fileToRead) { + String nodeName = thisNodeFile.getPath().getName(); + FileStatus checkSum = getAllChecksumFiles(checkSumFiles, + thisNodeFile.getPath().getName()); + long endIndex = -1; + if (checkSum != null) { + endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + } + IndexedLogsMeta indexedLogsMeta = null; + try { + indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(), + endIndex); + } catch (Exception ex) { + // DO NOTHING + LOG.warn("Can not load log meta from the log file:" + + thisNodeFile.getPath()); + continue; + } + if (indexedLogsMeta == null) { + continue; + } + String compressAlgo = indexedLogsMeta.getCompressName(); + List candidates = new ArrayList<>(); + for (IndexedPerAggregationLogMeta logMeta + : indexedLogsMeta.getLogMetas()) { + for (Entry> meta + : logMeta.getLogMetas().entrySet()) { + for (IndexedFileLogMeta log : meta.getValue()) { + if (!getAllContainers && !log.getContainerId() + .equals(containerIdStr)) { + continue; + } + if (logTypes != null && !logTypes.isEmpty() && + !logTypes.contains(log.getFileName())) { + continue; + } + candidates.add(log); + } + } + } + if (candidates.isEmpty()) { + continue; + } + + Algorithm compressName = Compression.getCompressionAlgorithmByName( + compressAlgo); + Decompressor decompressor = compressName.getDecompressor(); + FileContext fileContext = FileContext.getFileContext( + thisNodeFile.getPath().toUri(), conf); + FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); + String currentContainer = ""; + for (IndexedFileLogMeta candidate : candidates) { + if (!candidate.getContainerId().equals(currentContainer)) { + if (createPrintStream) { + closePrintStream(os); + os = LogToolUtils.createPrintStream( + logRequest.getOutputLocalDir(), + thisNodeFile.getPath().getName(), + candidate.getContainerId()); + currentContainer = candidate.getContainerId(); + } + } + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, + candidate.getStartIndex(), + candidate.getFileCompressedSize()), + decompressor, getFSInputBufferSize(conf)); + LogToolUtils.outputContainerLog(candidate.getContainerId(), + nodeName, candidate.getFileName(), candidate.getFileSize(), size, + Times.format(candidate.getLastModificatedTime()), + in, os, buf, ContainerLogAggregationType.AGGREGATED); + byte[] b = aggregatedLogSuffix(candidate.getFileName()) + .getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } catch (IOException e) { + System.err.println(e.getMessage()); + compressName.returnDecompressor(decompressor); + continue; + } finally { + os.flush(); + IOUtils.cleanupWithLogger(LOG, in); + } + } + } + return findLogs; + } + + // TODO: fix me if the remote file system does not support append operation. + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + List listOfLogsMeta = new ArrayList<>(); + List containersLogMeta = new ArrayList<>(); + String containerIdStr = logRequest.getContainerId(); + String nodeId = logRequest.getNodeId(); + ApplicationId appId = logRequest.getAppId(); + String appOwner = logRequest.getAppOwner(); + boolean getAllContainers = (containerIdStr == null || + containerIdStr.isEmpty()); + String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null + : LogAggregationUtils.getNodeString(nodeId); + List nodeFiles = LogAggregationUtils + .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, + this.remoteRootLogDirSuffix); + if (nodeFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = filterFiles( + nodeFiles, CHECK_SUM_FILE_SUFFIX); + List fileToRead = getNodeLogFileToRead( + nodeFiles, nodeIdStr, appId); + for(FileStatus thisNodeFile : fileToRead) { + try { + FileStatus checkSum = getAllChecksumFiles(checkSumFiles, + thisNodeFile.getPath().getName()); + long endIndex = -1; + if (checkSum != null) { + endIndex = loadIndexedLogsCheckSum(checkSum.getPath()); + } + IndexedLogsMeta current = loadIndexedLogsMeta( + thisNodeFile.getPath(), endIndex); + if (current != null) { + listOfLogsMeta.add(current); + } + } catch (IOException ex) { + // DO NOTHING + LOG.warn("Can not get log meta from the log file:" + + thisNodeFile.getPath()); + } + } + for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) { + String curNodeId = indexedLogMeta.getNodeId(); + for (IndexedPerAggregationLogMeta logMeta : + indexedLogMeta.getLogMetas()) { + if (getAllContainers) { + for (Entry> log : logMeta + .getLogMetas().entrySet()) { + ContainerLogMeta meta = new ContainerLogMeta( + log.getKey().toString(), curNodeId); + for (IndexedFileLogMeta aMeta : log.getValue()) { + meta.addLogMeta(aMeta.getFileName(), Long.toString( + aMeta.getFileSize()), + Times.format(aMeta.getLastModificatedTime())); + } + containersLogMeta.add(meta); + } + } else if (logMeta.getContainerLogMeta(containerIdStr) != null) { + ContainerLogMeta meta = new ContainerLogMeta(containerIdStr, + curNodeId); + for (IndexedFileLogMeta log : + logMeta.getContainerLogMeta(containerIdStr)) { + meta.addLogMeta(log.getFileName(), Long.toString( + log.getFileSize()), + Times.format(log.getLastModificatedTime())); + } + containersLogMeta.add(meta); + } + } + } + Collections.sort(containersLogMeta, new Comparator() { + @Override + public int compare(ContainerLogMeta o1, ContainerLogMeta o2) { + return o1.getContainerId().compareTo(o2.getContainerId()); + } + }); + return containersLogMeta; + } + + @Private + public Map filterFiles( + List fileList, final String suffix) throws IOException { + Map checkSumFiles = new HashMap<>(); + Set status = new HashSet(fileList); + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName().endsWith( + suffix); + } + }); + status = Sets.newHashSet(mask); + for (FileStatus file : status) { + checkSumFiles.put(file.getPath().getName(), file); + } + return checkSumFiles; + } + + @Private + public List getNodeLogFileToRead( + List nodeFiles, String nodeId, ApplicationId appId) + throws IOException { + List listOfFiles = new ArrayList<>(); + List files = new ArrayList<>(nodeFiles); + for (FileStatus file : files) { + String nodeName = file.getPath().getName(); + if ((nodeId == null || nodeId.isEmpty() + || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX) && + !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); + files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); + continue; + } + listOfFiles.add(file); + } + } + return listOfFiles; + } + + @Private + public FileStatus getAllChecksumFiles(Map fileMap, + String fileName) { + for (Entry file : fileMap.entrySet()) { + if (file.getKey().startsWith(fileName) && file.getKey() + .endsWith(CHECK_SUM_FILE_SUFFIX)) { + return file.getValue(); + } + } + return null; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + IndexedFileAggregatedLogsBlock block = new IndexedFileAggregatedLogsBlock( + context, this.conf, this); + block.render(html); + } + + @Override + public String getApplicationOwner(Path aggregatedLogPath) + throws IOException { + if (this.cachedIndexedLogsMeta == null + || !this.cachedIndexedLogsMeta.getRemoteLogPath() + .equals(aggregatedLogPath)) { + this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( + loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + } + return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser(); + } + + @Override + public Map getApplicationAcls( + Path aggregatedLogPath) throws IOException { + if (this.cachedIndexedLogsMeta == null + || !this.cachedIndexedLogsMeta.getRemoteLogPath() + .equals(aggregatedLogPath)) { + this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( + loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + } + return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls(); + } + + @Override + public Path getRemoteAppLogDir(ApplicationId appId, String user) + throws IOException { + return LogAggregationUtils.getRemoteAppLogDir(conf, appId, user, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + + @Private + public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) + throws IOException { + FileContext fileContext = + FileContext.getFileContext(remoteLogPath.toUri(), conf); + FSDataInputStream fsDataIStream = null; + try { + fsDataIStream = fileContext.open(remoteLogPath); + if (end == 0) { + return null; + } + long fileLength = end < 0 ? fileContext.getFileStatus( + remoteLogPath).getLen() : end; + fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); + int offset = fsDataIStream.readInt(); + byte[] array = new byte[offset]; + fsDataIStream.seek( + fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); + int actual = fsDataIStream.read(array); + if (actual != offset) { + throw new IOException("Error on loading log meta from " + + remoteLogPath); + } + return (IndexedLogsMeta)SerializationUtils + .deserialize(array); + } finally { + IOUtils.cleanupWithLogger(LOG, fsDataIStream); + } + } + + private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath) + throws IOException { + return loadIndexedLogsMeta(remoteLogPath, -1); + } + + @Private + public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath) + throws IOException { + FileContext fileContext = + FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf); + FSDataInputStream fsDataIStream = null; + try { + fsDataIStream = fileContext.open(remoteLogCheckSumPath); + return fsDataIStream.readLong(); + } finally { + IOUtils.cleanupWithLogger(LOG, fsDataIStream); + } + } + + /** + * This IndexedLogsMeta includes all the meta information + * for the aggregated log file. + */ + @Private + @VisibleForTesting + public static class IndexedLogsMeta implements Serializable { + + private static final long serialVersionUID = 5439875373L; + private int version; + private String user; + private String compressName; + private Map acls; + private String nodeId; + private List logMetas = new ArrayList<>(); + + public int getVersion() { + return this.version; + } + + public void setVersion(int version) { + this.version = version; + } + + public String getUser() { + return this.user; + } + + public void setUser(String user) { + this.user = user; + } + + public Map getAcls() { + return this.acls; + } + + public void setAcls(Map acls) { + this.acls = acls; + } + + public String getCompressName() { + return compressName; + } + + public void setCompressName(String compressName) { + this.compressName = compressName; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public void addLogMeta(IndexedPerAggregationLogMeta logMeta) { + logMetas.add(logMeta); + } + + public List getLogMetas() { + return logMetas; + } + } + + /** + * This IndexedPerAggregationLogMeta includes the meta information + * for all files which would be aggregated in one + * Log aggregation cycle. + */ + public static class IndexedPerAggregationLogMeta implements Serializable { + private static final long serialVersionUID = 3929298383L; + private String remoteNodeLogFileName; + private Map> logMetas = new HashMap<>(); + private long uploadTimeStamp; + + public String getRemoteNodeFile() { + return remoteNodeLogFileName; + } + public void setRemoteNodeFile(String remoteNodeLogFileName) { + this.remoteNodeLogFileName = remoteNodeLogFileName; + } + + public void addContainerLogMeta(String containerId, + List logMeta) { + logMetas.put(containerId, logMeta); + } + + public List getContainerLogMeta(String containerId) { + return logMetas.get(containerId); + } + + public Map> getLogMetas() { + return logMetas; + } + + public long getUploadTimeStamp() { + return uploadTimeStamp; + } + + public void setUploadTimeStamp(long uploadTimeStamp) { + this.uploadTimeStamp = uploadTimeStamp; + } + } + + /** + * This IndexedFileLogMeta includes the meta information + * for a single file which would be aggregated in one + * Log aggregation cycle. + * + */ + @Private + @VisibleForTesting + public static class IndexedFileLogMeta implements Serializable { + private static final long serialVersionUID = 1L; + private String containerId; + private String fileName; + private long fileSize; + private long fileCompressedSize; + private long lastModificatedTime; + private long startIndex; + + public String getFileName() { + return fileName; + } + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public long getFileSize() { + return fileSize; + } + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public long getFileCompressedSize() { + return fileCompressedSize; + } + public void setFileCompressedSize(long fileCompressedSize) { + this.fileCompressedSize = fileCompressedSize; + } + + public long getLastModificatedTime() { + return lastModificatedTime; + } + public void setLastModificatedTime(long lastModificatedTime) { + this.lastModificatedTime = lastModificatedTime; + } + + public long getStartIndex() { + return startIndex; + } + public void setStartIndex(long startIndex) { + this.startIndex = startIndex; + } + + public String getContainerId() { + return containerId; + } + public void setContainerId(String containerId) { + this.containerId = containerId; + } + } + + private static String logErrorMessage(File logFile, Exception e) { + String message = "Error aggregating log file. Log file : " + + logFile.getAbsolutePath() + ". " + e.getMessage(); + LOG.error(message, e); + return message; + } + + private static class IndexedFileOutputStreamState { + private final Algorithm compressAlgo; + private Compressor compressor; + private final FSDataOutputStream fsOut; + private long posStart; + private final SimpleBufferedOutputStream fsBufferedOutput; + private OutputStream out; + private long offset; + + IndexedFileOutputStreamState(Algorithm compressionName, + FSDataOutputStream fsOut, Configuration conf, long offset) + throws IOException { + this.compressAlgo = compressionName; + this.fsOut = fsOut; + this.offset = offset; + this.posStart = fsOut.getPos(); + + BytesWritable fsOutputBuffer = new BytesWritable(); + fsOutputBuffer.setCapacity(LogAggregationIndexedFileController + .getFSOutputBufferSize(conf)); + + this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, + fsOutputBuffer.getBytes()); + + this.compressor = compressAlgo.getCompressor(); + + try { + this.out = compressAlgo.createCompressionStream( + fsBufferedOutput, compressor, 0); + } catch (IOException e) { + compressAlgo.returnCompressor(compressor); + throw e; + } + } + + OutputStream getOutputStream() { + return out; + } + + long getCurrentPos() throws IOException { + return fsOut.getPos() + fsBufferedOutput.size(); + } + + long getStartPos() { + return posStart + offset; + } + + long getCompressedSize() throws IOException { + long ret = getCurrentPos() - posStart; + return ret; + } + + void finish() throws IOException { + try { + if (out != null) { + out.flush(); + out = null; + } + } finally { + compressAlgo.returnCompressor(compressor); + compressor = null; + } + } + } + + private static class CachedIndexedLogsMeta { + private final Path remoteLogPath; + private final IndexedLogsMeta indexedLogsMeta; + CachedIndexedLogsMeta(IndexedLogsMeta indexedLogsMeta, + Path remoteLogPath) { + this.indexedLogsMeta = indexedLogsMeta; + this.remoteLogPath = remoteLogPath; + } + + public Path getRemoteLogPath() { + return this.remoteLogPath; + } + + public IndexedLogsMeta getCachedIndexedLogsMeta() { + return this.indexedLogsMeta; + } + } + + @Private + public static int getFSOutputBufferSize(Configuration conf) { + return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); + } + + @Private + public static int getFSInputBufferSize(Configuration conf) { + return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); + } + + private abstract class FSAction { + abstract T run() throws Exception; + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (IOException e) { + LOG.info("Exception while executing an FS operation.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out FS retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on FS. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java new file mode 100644 index 00000000000..08ddecef5db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index aaed538fc01..989b3266c60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math3.util.Pair; @@ -192,7 +191,7 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, while (valueStream != null) { if (getAllContainers || (key.toString().equals(containerIdStr))) { if (createPrintStream) { - os = createPrintStream( + os = LogToolUtils.createPrintStream( logRequest.getOutputLocalDir(), thisNodeFile.getPath().getName(), key.toString()); } @@ -209,12 +208,7 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, Times.format(thisNodeFile.getModificationTime()), valueStream, os, buf, ContainerLogAggregationType.AGGREGATED); - StringBuilder sb = new StringBuilder(); - String endOfFile = "End of LogType:" + fileType; - sb.append("\n" + endOfFile + "\n"); - sb.append(StringUtils.repeat("*", endOfFile.length() + 50) - + "\n\n"); - byte[] b = sb.toString().getBytes( + byte[] b = aggregatedLogSuffix(fileType).getBytes( Charset.forName("UTF-8")); os.write(b, 0, b.length); findLogs = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java new file mode 100644 index 00000000000..5f6171035c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Function test for {@link LogAggregationIndexFileController}. + * + */ +public class TestLogAggregationIndexFileController { + + private final String rootLocalLogDir = "target/LocalLogs"; + private final Path rootLocalLogDirPath = new Path(rootLocalLogDir); + private final String remoteLogDir = "target/remote-app"; + private static final FsPermission LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0777)); + private static final UserGroupInformation USER_UGI = UserGroupInformation + .createRemoteUser("testUser"); + private FileSystem fs; + private Configuration conf; + private ApplicationId appId; + private ContainerId containerId; + private NodeId nodeId; + + private ByteArrayOutputStream sysOutStream; + private PrintStream sysOut; + + private ByteArrayOutputStream sysErrStream; + private PrintStream sysErr; + + @Before + public void setUp() throws IOException { + appId = ApplicationId.newInstance(123456, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( + appId, 1); + containerId = ContainerId.newContainerId(attemptId, 1); + nodeId = NodeId.newInstance("localhost", 9999); + conf = new Configuration(); + conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir", + remoteLogDir); + conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix", + "logs"); + conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz"); + fs = FileSystem.get(conf); + sysOutStream = new ByteArrayOutputStream(); + sysOut = new PrintStream(sysOutStream); + System.setOut(sysOut); + + sysErrStream = new ByteArrayOutputStream(); + sysErr = new PrintStream(sysErrStream); + System.setErr(sysErr); + } + + @After + public void teardown() throws Exception { + fs.delete(rootLocalLogDirPath, true); + fs.delete(new Path(remoteLogDir), true); + } + + @Test(timeout = 15000) + public void testLogAggregationIndexFileFormat() throws Exception { + if (fs.exists(rootLocalLogDirPath)) { + fs.delete(rootLocalLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLocalLogDirPath)); + + Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + List logTypes = new ArrayList(); + logTypes.add("syslog"); + logTypes.add("stdout"); + logTypes.add("stderr"); + + Set files = new HashSet<>(); + + LogKey key1 = new LogKey(containerId.toString()); + + for(String logType : logTypes) { + File file = createAndWriteLocalLogFile(containerId, appLogsDir, + logType); + files.add(file); + } + LogValue value = mock(LogValue.class); + when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); + + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + fileFormat.initialize(conf, "Indexed"); + + Map appAcls = new HashMap<>(); + Path appDir = fileFormat.getRemoteAppLogDir(appId, + USER_UGI.getShortUserName()); + if (fs.exists(appDir)) { + fs.delete(appDir, true); + } + assertTrue(fs.mkdirs(appDir)); + + Path logPath = fileFormat.getRemoteNodeLogFileForApp( + appId, USER_UGI.getShortUserName(), nodeId); + LogAggregationFileControllerContext context = + new LogAggregationFileControllerContext( + logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI); + // initialize the writer + fileFormat.initializeWriter(context); + + fileFormat.write(key1, value); + LogAggregationFileControllerContext record = mock( + LogAggregationFileControllerContext.class); + fileFormat.postWrite(record); + fileFormat.closeWriter(); + + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(USER_UGI.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + List meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertTrue(meta.size() == 1); + List fileNames = new ArrayList<>(); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + Assert.assertTrue(log.getContainerLogMeta().size() == 3); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(logTypes); + Assert.assertTrue(fileNames.isEmpty()); + + boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : logTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + + // create a checksum file + Path checksumFile = new Path(fileFormat.getRemoteAppLogDir( + appId, USER_UGI.getShortUserName()), + LogAggregationUtils.getNodeString(nodeId) + + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); + FSDataOutputStream fInput = null; + try { + fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK); + fInput.writeLong(0); + } finally { + IOUtils.closeQuietly(fInput); + } + meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertTrue(meta.size() == 0); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertFalse(foundLogs); + sysOutStream.reset(); + fs.delete(checksumFile, false); + Assert.assertFalse(fs.exists(checksumFile)); + + List newLogTypes = new ArrayList<>(logTypes); + files.clear(); + newLogTypes.add("test1"); + files.add(createAndWriteLocalLogFile(containerId, appLogsDir, + "test1")); + newLogTypes.add("test2"); + files.add(createAndWriteLocalLogFile(containerId, appLogsDir, + "test2")); + LogValue value2 = mock(LogValue.class); + when(value2.getPendingLogFilesToUploadForThisContainer()) + .thenReturn(files); + + // initialize the writer + fileFormat.initializeWriter(context); + fileFormat.write(key1, value2); + fileFormat.closeWriter(); + + // We did not call postWriter which we would keep the checksum file. + // We can only get the logs/logmeta from the first write. + fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), meta.size(), 1); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + Assert.assertTrue(log.getContainerLogMeta().size() == 3); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(logTypes); + Assert.assertTrue(fileNames.isEmpty()); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : logTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + Assert.assertFalse(sysOutStream.toString().contains(logMessage( + containerId, "test1"))); + Assert.assertFalse(sysOutStream.toString().contains(logMessage( + containerId, "test2"))); + sysOutStream.reset(); + + // Call postWrite and we should get all logs/logmetas for both + // first write and second write + fileFormat.initializeWriter(context); + fileFormat.write(key1, value2); + fileFormat.postWrite(record); + fileFormat.closeWriter(); + fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), meta.size(), 2); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + } + + private File createAndWriteLocalLogFile(ContainerId containerId, + Path localLogDir, String logType) throws IOException { + File file = new File(localLogDir.toString(), logType); + if (file.exists()) { + file.delete(); + } + file.createNewFile(); + Writer writer = null; + try { + writer = new FileWriter(file); + writer.write(logMessage(containerId, logType)); + writer.close(); + return file; + } finally { + IOUtils.closeQuietly(writer); + } + } + + private String logMessage(ContainerId containerId, String logType) { + StringBuilder sb = new StringBuilder(); + sb.append("Hello " + containerId + " in " + logType + "!"); + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index 04c72bf8d01..9ae792ceb39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore; @@ -847,7 +847,7 @@ public void testContainerLogsMetaForRunningApps() throws Exception { for (ContainerLogsInfo logInfo : responseText) { if(logInfo.getLogType().equals( ContainerLogAggregationType.AGGREGATED.toString())) { - List logMeta = logInfo + List logMeta = logInfo .getContainerLogsInfo(); assertTrue(logMeta.size() == 1); assertEquals(logMeta.get(0).getFileName(), fileName); @@ -875,7 +875,7 @@ public void testContainerLogsMetaForRunningApps() throws Exception { for (ContainerLogsInfo logInfo : responseText) { if(logInfo.getLogType().equals( ContainerLogAggregationType.AGGREGATED.toString())) { - List logMeta = logInfo + List logMeta = logInfo .getContainerLogsInfo(); assertTrue(logMeta.size() == 1); assertEquals(logMeta.get(0).getFileName(), fileName); @@ -913,7 +913,7 @@ public void testContainerLogsMetaForFinishedApps() throws Exception { assertTrue(responseText.size() == 1); assertEquals(responseText.get(0).getLogType(), ContainerLogAggregationType.AGGREGATED.toString()); - List logMeta = responseText.get(0) + List logMeta = responseText.get(0) .getContainerLogsInfo(); assertTrue(logMeta.size() == 1); assertEquals(logMeta.get(0).getFileName(), fileName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java index bc3ab393bc5..1bb0408d944 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java @@ -27,14 +27,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; /** * {@code ContainerLogsInfo} includes the log meta-data of containers. *

* The container log meta-data includes details such as: *

    - *
  • A list of {@link PerContainerLogFileInfo}.
  • + *
  • A list of {@link ContainerLogFileInfo}.
  • *
  • The container Id.
  • *
  • The NodeManager Id.
  • *
  • The logType: could be local or aggregated
  • @@ -46,7 +46,7 @@ public class ContainerLogsInfo { @XmlElement(name = "containerLogInfo") - protected List containerLogsInfo; + protected List containerLogsInfo; @XmlElement(name = "logAggregationType") protected String logType; @@ -62,14 +62,14 @@ public ContainerLogsInfo() {} public ContainerLogsInfo(ContainerLogMeta logMeta, ContainerLogAggregationType logType) throws YarnException { - this.containerLogsInfo = new ArrayList( + this.containerLogsInfo = new ArrayList( logMeta.getContainerLogMeta()); this.logType = logType.toString(); this.containerId = logMeta.getContainerId(); this.nodeId = logMeta.getNodeId(); } - public List getContainerLogsInfo() { + public List getContainerLogsInfo() { return this.containerLogsInfo; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java index 5415e04eb47..193ec623f20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMContainerLogsInfo.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; @@ -55,10 +55,10 @@ public NMContainerLogsInfo(final Context nmContext, containerId, remoteUser, nmContext); } - private static List getContainerLogsInfo( + private static List getContainerLogsInfo( ContainerId id, String remoteUser, Context nmContext) throws YarnException { - List logFiles = new ArrayList<>(); + List logFiles = new ArrayList<>(); List logDirs = ContainerLogsUtils.getContainerLogDirs( id, remoteUser, nmContext); for (File containerLogsDir : logDirs) { @@ -66,7 +66,7 @@ private static List getContainerLogsInfo( if (logs != null) { for (File log : logs) { if (log.isFile()) { - PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo( + ContainerLogFileInfo logMeta = new ContainerLogFileInfo( log.getName(), Long.toString(log.length()), Times.format(log.lastModified())); logFiles.add(logMeta); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index c731d3574f4..4586a7b88c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -529,7 +529,7 @@ private void testContainerLogs(WebResource r, ContainerId containerId) assertTrue(responseList.size() == 1); assertEquals(responseList.get(0).getLogType(), ContainerLogAggregationType.LOCAL.toString()); - List logMeta = responseList.get(0) + List logMeta = responseList.get(0) .getContainerLogsInfo(); assertTrue(logMeta.size() == 1); assertEquals(logMeta.get(0).getFileName(), filename); @@ -556,13 +556,13 @@ private void testContainerLogs(WebResource r, ContainerId containerId) for (ContainerLogsInfo logInfo : responseList) { if(logInfo.getLogType().equals( ContainerLogAggregationType.AGGREGATED.toString())) { - List meta = logInfo.getContainerLogsInfo(); + List meta = logInfo.getContainerLogsInfo(); assertTrue(meta.size() == 1); assertEquals(meta.get(0).getFileName(), aggregatedLogFile); } else { assertEquals(logInfo.getLogType(), ContainerLogAggregationType.LOCAL.toString()); - List meta = logInfo.getContainerLogsInfo(); + List meta = logInfo.getContainerLogsInfo(); assertTrue(meta.size() == 1); assertEquals(meta.get(0).getFileName(), filename); }