YARN-7072. Add a new log aggregation file format controller. Contributed by Xuan Gong.
This commit is contained in:
parent
8edc60531f
commit
3fddabc2fe
|
@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
* BoundedRangeFileInputStream on top of the same FSDataInputStream and they
|
||||
* would not interfere with each other.
|
||||
*/
|
||||
class BoundedRangeFileInputStream extends InputStream {
|
||||
public class BoundedRangeFileInputStream extends InputStream {
|
||||
|
||||
private FSDataInputStream in;
|
||||
private long pos;
|
||||
|
|
|
@ -43,7 +43,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
|
|||
/**
|
||||
* Compression related stuff.
|
||||
*/
|
||||
final class Compression {
|
||||
public final class Compression {
|
||||
static final Logger LOG = LoggerFactory.getLogger(Compression.class);
|
||||
|
||||
/**
|
||||
|
@ -75,7 +75,7 @@ final class Compression {
|
|||
/**
|
||||
* Compression algorithms.
|
||||
*/
|
||||
enum Algorithm {
|
||||
public enum Algorithm {
|
||||
LZO(TFile.COMPRESSION_LZO) {
|
||||
private transient boolean checked = false;
|
||||
private static final String defaultClazz =
|
||||
|
@ -348,7 +348,7 @@ final class Compression {
|
|||
}
|
||||
}
|
||||
|
||||
static Algorithm getCompressionAlgorithmByName(String compressName) {
|
||||
public static Algorithm getCompressionAlgorithmByName(String compressName) {
|
||||
Algorithm[] algos = Algorithm.class.getEnumConstants();
|
||||
|
||||
for (Algorithm a : algos) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.io.OutputStream;
|
|||
* A simplified BufferedOutputStream with borrowed buffer, and allow users to
|
||||
* see how much data have been buffered.
|
||||
*/
|
||||
class SimpleBufferedOutputStream extends FilterOutputStream {
|
||||
public class SimpleBufferedOutputStream extends FilterOutputStream {
|
||||
protected byte buf[]; // the borrowed buffer
|
||||
protected int count = 0; // bytes used in buffer.
|
||||
|
||||
|
|
|
@ -62,9 +62,10 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
|
@ -411,10 +412,10 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return false;
|
||||
}
|
||||
|
||||
private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
|
||||
private List<Pair<ContainerLogFileInfo, String>> getContainerLogFiles(
|
||||
Configuration conf, String containerIdStr, String nodeHttpAddress)
|
||||
throws IOException {
|
||||
List<Pair<PerContainerLogFileInfo, String>> logFileInfos
|
||||
List<Pair<ContainerLogFileInfo, String>> logFileInfos
|
||||
= new ArrayList<>();
|
||||
Client webServiceClient = Client.create();
|
||||
try {
|
||||
|
@ -453,12 +454,12 @@ public class LogsCLI extends Configured implements Tool {
|
|||
if (ob instanceof JSONArray) {
|
||||
JSONArray obArray = (JSONArray)ob;
|
||||
for (int j = 0; j < obArray.length(); j++) {
|
||||
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
|
||||
logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
|
||||
generatePerContainerLogFileInfoFromJSON(
|
||||
obArray.getJSONObject(j)), aggregateType));
|
||||
}
|
||||
} else if (ob instanceof JSONObject) {
|
||||
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
|
||||
logFileInfos.add(new Pair<ContainerLogFileInfo, String>(
|
||||
generatePerContainerLogFileInfoFromJSON(
|
||||
(JSONObject)ob), aggregateType));
|
||||
}
|
||||
|
@ -477,7 +478,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return logFileInfos;
|
||||
}
|
||||
|
||||
private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
|
||||
private ContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
|
||||
JSONObject meta) throws JSONException {
|
||||
String fileName = meta.has("fileName") ?
|
||||
meta.getString("fileName") : "N/A";
|
||||
|
@ -485,7 +486,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
meta.getString("fileSize") : "N/A";
|
||||
String lastModificationTime = meta.has("lastModifiedTime") ?
|
||||
meta.getString("lastModifiedTime") : "N/A";
|
||||
return new PerContainerLogFileInfo(fileName, fileSize,
|
||||
return new ContainerLogFileInfo(fileName, fileSize,
|
||||
lastModificationTime);
|
||||
}
|
||||
|
||||
|
@ -506,7 +507,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
String nodeId = request.getNodeId();
|
||||
PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
|
||||
PrintStream out = LogToolUtils.createPrintStream(localDir, nodeId,
|
||||
containerIdStr);
|
||||
try {
|
||||
Set<String> matchedFiles = getMatchedContainerLogFiles(request,
|
||||
|
@ -1235,9 +1236,9 @@ public class LogsCLI extends Configured implements Tool {
|
|||
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
|
||||
"LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
|
||||
outStream.println(StringUtils.repeat("=", containerString.length() * 2));
|
||||
List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
|
||||
List<Pair<ContainerLogFileInfo, String>> infos = getContainerLogFiles(
|
||||
getConf(), containerId, nodeHttpAddress);
|
||||
for (Pair<PerContainerLogFileInfo, String> info : infos) {
|
||||
for (Pair<ContainerLogFileInfo, String> info : infos) {
|
||||
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
|
||||
info.getKey().getFileName(), info.getKey().getFileSize(),
|
||||
info.getKey().getLastModifiedTime(), info.getValue());
|
||||
|
@ -1249,11 +1250,11 @@ public class LogsCLI extends Configured implements Tool {
|
|||
boolean useRegex) throws IOException {
|
||||
// fetch all the log files for the container
|
||||
// filter the log files based on the given -log_files pattern
|
||||
List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
|
||||
List<Pair<ContainerLogFileInfo, String>> allLogFileInfos=
|
||||
getContainerLogFiles(getConf(), request.getContainerId(),
|
||||
request.getNodeHttpAddress());
|
||||
List<String> fileNames = new ArrayList<String>();
|
||||
for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
|
||||
for (Pair<ContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
|
||||
fileNames.add(fileInfo.getKey().getFileName());
|
||||
}
|
||||
return getMatchedLogFiles(request, fileNames,
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
/**
|
||||
* PerContainerLogFileInfo represents the meta data for a container log file,
|
||||
* ContainerLogFileInfo represents the meta data for a container log file,
|
||||
* which includes:
|
||||
* <ul>
|
||||
* <li>The filename of the container log.</li>
|
||||
|
@ -28,15 +28,15 @@ package org.apache.hadoop.yarn.logaggregation;
|
|||
* </ul>
|
||||
*
|
||||
*/
|
||||
public class PerContainerLogFileInfo {
|
||||
public class ContainerLogFileInfo {
|
||||
private String fileName;
|
||||
private String fileSize;
|
||||
private String lastModifiedTime;
|
||||
|
||||
//JAXB needs this
|
||||
public PerContainerLogFileInfo() {}
|
||||
public ContainerLogFileInfo() {}
|
||||
|
||||
public PerContainerLogFileInfo(String fileName, String fileSize,
|
||||
public ContainerLogFileInfo(String fileName, String fileSize,
|
||||
String lastModifiedTime) {
|
||||
this.setFileName(fileName);
|
||||
this.setFileSize(fileSize);
|
||||
|
@ -83,10 +83,10 @@ public class PerContainerLogFileInfo {
|
|||
if (otherObj == this) {
|
||||
return true;
|
||||
}
|
||||
if (!(otherObj instanceof PerContainerLogFileInfo)) {
|
||||
if (!(otherObj instanceof ContainerLogFileInfo)) {
|
||||
return false;
|
||||
}
|
||||
PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
|
||||
ContainerLogFileInfo other = (ContainerLogFileInfo)otherObj;
|
||||
return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
|
||||
&& other.lastModifiedTime.equals(lastModifiedTime);
|
||||
}
|
|
@ -26,14 +26,14 @@ import java.util.List;
|
|||
* <ul>
|
||||
* <li>The Container Id.</li>
|
||||
* <li>The NodeManager Id.</li>
|
||||
* <li>A list of {@link PerContainerLogFileInfo}.</li>
|
||||
* <li>A list of {@link ContainerLogFileInfo}.</li>
|
||||
* </ul>
|
||||
*
|
||||
*/
|
||||
public class ContainerLogMeta {
|
||||
private String containerId;
|
||||
private String nodeId;
|
||||
private List<PerContainerLogFileInfo> logMeta;
|
||||
private List<ContainerLogFileInfo> logMeta;
|
||||
|
||||
public ContainerLogMeta(String containerId, String nodeId) {
|
||||
this.containerId = containerId;
|
||||
|
@ -51,11 +51,11 @@ public class ContainerLogMeta {
|
|||
|
||||
public void addLogMeta(String fileName, String fileSize,
|
||||
String lastModificationTime) {
|
||||
logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
|
||||
logMeta.add(new ContainerLogFileInfo(fileName, fileSize,
|
||||
lastModificationTime));
|
||||
}
|
||||
|
||||
public List<PerContainerLogFileInfo> getContainerLogMeta() {
|
||||
public List<ContainerLogFileInfo> getContainerLogMeta() {
|
||||
return this.logMeta;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@Private
|
||||
public class LogAggregationUtils {
|
||||
|
@ -195,6 +198,30 @@ public class LogAggregationUtils {
|
|||
return nodeFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all available log files under remote app log directory.
|
||||
* @param conf the configuration
|
||||
* @param appId the applicationId
|
||||
* @param appOwner the application owner
|
||||
* @param remoteRootLogDir the remote root log directory
|
||||
* @param suffix the log directory suffix
|
||||
* @return the list of available log files
|
||||
* @throws IOException if there is no log file available
|
||||
*/
|
||||
public static List<FileStatus> getRemoteNodeFileList(
|
||||
Configuration conf, ApplicationId appId, String appOwner,
|
||||
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
|
||||
throws IOException {
|
||||
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
|
||||
remoteRootLogDir, suffix);
|
||||
List<FileStatus> nodeFiles = new ArrayList<>();
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
|
||||
nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
|
||||
qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
|
||||
return nodeFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all available log files under remote app log directory.
|
||||
* @param conf the configuration
|
||||
|
|
|
@ -22,8 +22,6 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -229,7 +227,7 @@ public class LogCLIHelpers implements Configurable {
|
|||
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
|
||||
"LastModificationTime", "LogAggregationType");
|
||||
out.println(StringUtils.repeat("=", containerString.length() * 2));
|
||||
for (PerContainerLogFileInfo logMeta : containerLogMeta
|
||||
for (ContainerLogFileInfo logMeta : containerLogMeta
|
||||
.getContainerLogMeta()) {
|
||||
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
|
||||
logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
|
||||
|
@ -345,20 +343,6 @@ public class LogCLIHelpers implements Configurable {
|
|||
+ ". Error message found: " + errorMessage);
|
||||
}
|
||||
|
||||
@Private
|
||||
public PrintStream createPrintStream(String localDir, String nodeId,
|
||||
String containerId) throws IOException {
|
||||
PrintStream out = System.out;
|
||||
if(localDir != null && !localDir.isEmpty()) {
|
||||
Path nodePath = new Path(localDir, LogAggregationUtils
|
||||
.getNodeString(nodeId));
|
||||
Files.createDirectories(Paths.get(nodePath.toString()));
|
||||
Path containerLogPath = new Path(nodePath, containerId);
|
||||
out = new PrintStream(containerLogPath.toString(), "UTF-8");
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
public void closePrintStream(PrintStream out) {
|
||||
if (out != System.out) {
|
||||
IOUtils.closeQuietly(out);
|
||||
|
@ -379,7 +363,7 @@ public class LogCLIHelpers implements Configurable {
|
|||
return logTypes;
|
||||
}
|
||||
for (ContainerLogMeta logMeta: containersLogMeta) {
|
||||
for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
|
||||
for (ContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
|
||||
logTypes.add(fileInfo.getFileName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,11 +21,15 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* This class contains several utility function which could be used in different
|
||||
|
@ -158,4 +162,26 @@ public final class LogToolUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create the container log file under given (local directory/nodeId) and
|
||||
* return the PrintStream object.
|
||||
* @param localDir the Local Dir
|
||||
* @param nodeId the NodeId
|
||||
* @param containerId the ContainerId
|
||||
* @return the printStream object
|
||||
* @throws IOException if an I/O error occurs
|
||||
*/
|
||||
public static PrintStream createPrintStream(String localDir, String nodeId,
|
||||
String containerId) throws IOException {
|
||||
PrintStream out = System.out;
|
||||
if(localDir != null && !localDir.isEmpty()) {
|
||||
Path nodePath = new Path(localDir, LogAggregationUtils
|
||||
.getNodeString(nodeId));
|
||||
Files.createDirectories(Paths.get(nodePath.toString()));
|
||||
Path containerLogPath = new Path(nodePath, containerId);
|
||||
out = new PrintStream(containerLogPath.toString(), "UTF-8");
|
||||
}
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,9 +25,6 @@ import com.google.common.collect.Sets;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -37,6 +34,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -91,6 +89,12 @@ public abstract class LogAggregationFileController {
|
|||
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
|
||||
.createImmutable((short) 0770);
|
||||
|
||||
/**
|
||||
* Umask for the log file.
|
||||
*/
|
||||
protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
|
||||
.createImmutable((short) (0640 ^ 0777));
|
||||
|
||||
// This is temporary solution. The configuration will be deleted once
|
||||
// we find a more scalable method to only write a single log file per LRS.
|
||||
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
|
||||
|
@ -98,6 +102,11 @@ public abstract class LogAggregationFileController {
|
|||
private static final int
|
||||
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
|
||||
|
||||
// This is temporary solution. The configuration will be deleted once we have
|
||||
// the FileSystem API to check whether append operation is supported or not.
|
||||
public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
|
||||
= YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
|
||||
|
||||
protected Configuration conf;
|
||||
protected Path remoteRootLogDir;
|
||||
protected String remoteRootLogDirSuffix;
|
||||
|
@ -178,19 +187,6 @@ public abstract class LogAggregationFileController {
|
|||
public abstract void postWrite(LogAggregationFileControllerContext record)
|
||||
throws Exception;
|
||||
|
||||
protected PrintStream createPrintStream(String localDir, String nodeId,
|
||||
String containerId) throws IOException {
|
||||
PrintStream out = System.out;
|
||||
if(localDir != null && !localDir.isEmpty()) {
|
||||
Path nodePath = new Path(localDir, LogAggregationUtils
|
||||
.getNodeString(nodeId));
|
||||
Files.createDirectories(Paths.get(nodePath.toString()));
|
||||
Path containerLogPath = new Path(nodePath, containerId);
|
||||
out = new PrintStream(containerLogPath.toString(), "UTF-8");
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
protected void closePrintStream(OutputStream out) {
|
||||
if (out != System.out) {
|
||||
IOUtils.cleanupWithLogger(LOG, out);
|
||||
|
@ -481,4 +477,21 @@ public abstract class LogAggregationFileController {
|
|||
LOG.error("Failed to clean old logs", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the aggregated log suffix. The LogAggregationFileController
|
||||
* should call this to get the suffix and append the suffix to the end
|
||||
* of each log. This would keep the aggregated log format consistent.
|
||||
*
|
||||
* @param fileName the File Name
|
||||
* @return the aggregated log suffix String
|
||||
*/
|
||||
protected String aggregatedLogSuffix(String fileName) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String endOfFile = "End of LogType:" + fileName;
|
||||
sb.append("\n" + endOfFile + "\n");
|
||||
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
|
||||
+ "\n\n");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
|
||||
import org.apache.hadoop.io.file.tfile.Compression;
|
||||
import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedFileLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedLogsMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
|
||||
|
||||
/**
|
||||
* The Aggregated Logs Block implementation for Indexed File.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
|
||||
public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
|
||||
|
||||
private final LogAggregationIndexedFileController fileController;
|
||||
private final Configuration conf;
|
||||
|
||||
@Inject
|
||||
public IndexedFileAggregatedLogsBlock(ViewContext ctx,
|
||||
Configuration conf,
|
||||
LogAggregationIndexedFileController fileController) {
|
||||
super(ctx);
|
||||
this.conf = conf;
|
||||
this.fileController = fileController;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
BlockParameters params = verifyAndParseParameters(html);
|
||||
if (params == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ApplicationId appId = params.getAppId();
|
||||
ContainerId containerId = params.getContainerId();
|
||||
NodeId nodeId = params.getNodeId();
|
||||
String appOwner = params.getAppOwner();
|
||||
String logEntity = params.getLogEntity();
|
||||
long start = params.getStartIndex();
|
||||
long end = params.getEndIndex();
|
||||
|
||||
List<FileStatus> nodeFiles = null;
|
||||
try {
|
||||
nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileList(conf, appId, appOwner,
|
||||
this.fileController.getRemoteRootLogDir(),
|
||||
this.fileController.getRemoteRootLogDirSuffix());
|
||||
} catch(Exception ex) {
|
||||
html.h1("Unable to locate any logs for container "
|
||||
+ containerId.toString());
|
||||
LOG.error(ex.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, FileStatus> checkSumFiles;
|
||||
try {
|
||||
checkSumFiles = fileController.filterFiles(nodeFiles,
|
||||
LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
html.h1("Error getting logs for " + logEntity);
|
||||
return;
|
||||
}
|
||||
|
||||
List<FileStatus> fileToRead;
|
||||
try {
|
||||
fileToRead = fileController.getNodeLogFileToRead(nodeFiles,
|
||||
nodeId.toString(), appId);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
html.h1("Error getting logs for " + logEntity);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean foundLog = false;
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
for (FileStatus thisNodeFile : fileToRead) {
|
||||
FileStatus checkSum = fileController.getAllChecksumFiles(
|
||||
checkSumFiles, thisNodeFile.getPath().getName());
|
||||
long endIndex = -1;
|
||||
if (checkSum != null) {
|
||||
endIndex = fileController.loadIndexedLogsCheckSum(
|
||||
checkSum.getPath());
|
||||
}
|
||||
IndexedLogsMeta indexedLogsMeta = null;
|
||||
try {
|
||||
indexedLogsMeta = fileController.loadIndexedLogsMeta(
|
||||
thisNodeFile.getPath(), endIndex);
|
||||
} catch (Exception ex) {
|
||||
// DO NOTHING
|
||||
LOG.warn("Can not load log meta from the log file:"
|
||||
+ thisNodeFile.getPath());
|
||||
continue;
|
||||
}
|
||||
if (indexedLogsMeta == null) {
|
||||
continue;
|
||||
}
|
||||
Map<ApplicationAccessType, String> appAcls = indexedLogsMeta.getAcls();
|
||||
String user = indexedLogsMeta.getUser();
|
||||
String remoteUser = request().getRemoteUser();
|
||||
if (!checkAcls(conf, appId, user, appAcls, remoteUser)) {
|
||||
html.h1().__("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity
|
||||
+ " in log file [" + thisNodeFile.getPath().getName() + "]")
|
||||
.__();
|
||||
LOG.error("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity);
|
||||
continue;
|
||||
}
|
||||
String compressAlgo = indexedLogsMeta.getCompressName();
|
||||
List<IndexedFileLogMeta> candidates = new ArrayList<>();
|
||||
for (IndexedPerAggregationLogMeta logMeta
|
||||
: indexedLogsMeta.getLogMetas()) {
|
||||
for (Entry<String, List<IndexedFileLogMeta>> meta
|
||||
: logMeta.getLogMetas().entrySet()) {
|
||||
for (IndexedFileLogMeta log : meta.getValue()) {
|
||||
if (!log.getContainerId().equals(containerId.toString())) {
|
||||
continue;
|
||||
}
|
||||
if (desiredLogType != null && !desiredLogType.isEmpty()
|
||||
&& !desiredLogType.equals(log.getFileName())) {
|
||||
continue;
|
||||
}
|
||||
candidates.add(log);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (candidates.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Algorithm compressName = Compression.getCompressionAlgorithmByName(
|
||||
compressAlgo);
|
||||
Decompressor decompressor = compressName.getDecompressor();
|
||||
FileContext fileContext = FileContext.getFileContext(
|
||||
thisNodeFile.getPath().toUri(), conf);
|
||||
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
|
||||
int bufferSize = 65536;
|
||||
for (IndexedFileLogMeta candidate : candidates) {
|
||||
byte[] cbuf = new byte[bufferSize];
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = compressName.createDecompressionStream(
|
||||
new BoundedRangeFileInputStream(fsin,
|
||||
candidate.getStartIndex(),
|
||||
candidate.getFileCompressedSize()),
|
||||
decompressor,
|
||||
LogAggregationIndexedFileController.getFSInputBufferSize(
|
||||
conf));
|
||||
long logLength = candidate.getFileSize();
|
||||
html.pre().__("\n\n").__();
|
||||
html.p().__("Log Type: " + candidate.getFileName()).__();
|
||||
html.p().__("Log Upload Time: " + Times.format(
|
||||
candidate.getLastModificatedTime())).__();
|
||||
html.p().__("Log Length: " + Long.toString(
|
||||
logLength)).__();
|
||||
long startIndex = start < 0
|
||||
? logLength + start : start;
|
||||
startIndex = startIndex < 0 ? 0 : startIndex;
|
||||
startIndex = startIndex > logLength ? logLength : startIndex;
|
||||
long endLogIndex = end < 0
|
||||
? logLength + end : end;
|
||||
endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
|
||||
endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
|
||||
endLogIndex = endLogIndex < startIndex ?
|
||||
startIndex : endLogIndex;
|
||||
long toRead = endLogIndex - startIndex;
|
||||
if (toRead < logLength) {
|
||||
html.p().__("Showing " + toRead + " bytes of " + logLength
|
||||
+ " total. Click ").a(url("logs", $(NM_NODENAME),
|
||||
$(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
|
||||
candidate.getFileName(), "?start=0"), "here").
|
||||
__(" for the full log.").__();
|
||||
}
|
||||
long totalSkipped = 0;
|
||||
while (totalSkipped < start) {
|
||||
long ret = in.skip(start - totalSkipped);
|
||||
if (ret == 0) {
|
||||
//Read one byte
|
||||
int nextByte = in.read();
|
||||
// Check if we have reached EOF
|
||||
if (nextByte == -1) {
|
||||
throw new IOException("Premature EOF from container log");
|
||||
}
|
||||
ret = 1;
|
||||
}
|
||||
totalSkipped += ret;
|
||||
}
|
||||
int len = 0;
|
||||
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
PRE<Hamlet> pre = html.pre();
|
||||
|
||||
while (toRead > 0
|
||||
&& (len = in.read(cbuf, 0, currentToRead)) > 0) {
|
||||
pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
|
||||
toRead = toRead - len;
|
||||
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
}
|
||||
|
||||
pre.__();
|
||||
foundLog = true;
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
continue;
|
||||
} finally {
|
||||
IOUtils.closeQuietly(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container " + containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception ex) {
|
||||
html.h1().__("Error getting logs for " + logEntity).__();
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
|
@ -27,7 +27,6 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
|
@ -192,7 +191,7 @@ public class LogAggregationTFileController
|
|||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
if (createPrintStream) {
|
||||
os = createPrintStream(
|
||||
os = LogToolUtils.createPrintStream(
|
||||
logRequest.getOutputLocalDir(),
|
||||
thisNodeFile.getPath().getName(), key.toString());
|
||||
}
|
||||
|
@ -209,12 +208,7 @@ public class LogAggregationTFileController
|
|||
Times.format(thisNodeFile.getModificationTime()),
|
||||
valueStream, os, buf,
|
||||
ContainerLogAggregationType.AGGREGATED);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String endOfFile = "End of LogType:" + fileType;
|
||||
sb.append("\n" + endOfFile + "\n");
|
||||
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
|
||||
+ "\n\n");
|
||||
byte[] b = sb.toString().getBytes(
|
||||
byte[] b = aggregatedLogSuffix(fileType).getBytes(
|
||||
Charset.forName("UTF-8"));
|
||||
os.write(b, 0, b.length);
|
||||
findLogs = true;
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.Writer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Function test for {@link LogAggregationIndexFileController}.
|
||||
*
|
||||
*/
|
||||
public class TestLogAggregationIndexFileController {
|
||||
|
||||
private final String rootLocalLogDir = "target/LocalLogs";
|
||||
private final Path rootLocalLogDirPath = new Path(rootLocalLogDir);
|
||||
private final String remoteLogDir = "target/remote-app";
|
||||
private static final FsPermission LOG_FILE_UMASK = FsPermission
|
||||
.createImmutable((short) (0777));
|
||||
private static final UserGroupInformation USER_UGI = UserGroupInformation
|
||||
.createRemoteUser("testUser");
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private ApplicationId appId;
|
||||
private ContainerId containerId;
|
||||
private NodeId nodeId;
|
||||
|
||||
private ByteArrayOutputStream sysOutStream;
|
||||
private PrintStream sysOut;
|
||||
|
||||
private ByteArrayOutputStream sysErrStream;
|
||||
private PrintStream sysErr;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
appId = ApplicationId.newInstance(123456, 1);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
appId, 1);
|
||||
containerId = ContainerId.newContainerId(attemptId, 1);
|
||||
nodeId = NodeId.newInstance("localhost", 9999);
|
||||
conf = new Configuration();
|
||||
conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
|
||||
remoteLogDir);
|
||||
conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
|
||||
"logs");
|
||||
conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
|
||||
fs = FileSystem.get(conf);
|
||||
sysOutStream = new ByteArrayOutputStream();
|
||||
sysOut = new PrintStream(sysOutStream);
|
||||
System.setOut(sysOut);
|
||||
|
||||
sysErrStream = new ByteArrayOutputStream();
|
||||
sysErr = new PrintStream(sysErrStream);
|
||||
System.setErr(sysErr);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
fs.delete(rootLocalLogDirPath, true);
|
||||
fs.delete(new Path(remoteLogDir), true);
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testLogAggregationIndexFileFormat() throws Exception {
|
||||
if (fs.exists(rootLocalLogDirPath)) {
|
||||
fs.delete(rootLocalLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLocalLogDirPath));
|
||||
|
||||
Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
List<String> logTypes = new ArrayList<String>();
|
||||
logTypes.add("syslog");
|
||||
logTypes.add("stdout");
|
||||
logTypes.add("stderr");
|
||||
|
||||
Set<File> files = new HashSet<>();
|
||||
|
||||
LogKey key1 = new LogKey(containerId.toString());
|
||||
|
||||
for(String logType : logTypes) {
|
||||
File file = createAndWriteLocalLogFile(containerId, appLogsDir,
|
||||
logType);
|
||||
files.add(file);
|
||||
}
|
||||
LogValue value = mock(LogValue.class);
|
||||
when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
|
||||
|
||||
LogAggregationIndexedFileController fileFormat
|
||||
= new LogAggregationIndexedFileController();
|
||||
fileFormat.initialize(conf, "Indexed");
|
||||
|
||||
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
||||
Path appDir = fileFormat.getRemoteAppLogDir(appId,
|
||||
USER_UGI.getShortUserName());
|
||||
if (fs.exists(appDir)) {
|
||||
fs.delete(appDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appDir));
|
||||
|
||||
Path logPath = fileFormat.getRemoteNodeLogFileForApp(
|
||||
appId, USER_UGI.getShortUserName(), nodeId);
|
||||
LogAggregationFileControllerContext context =
|
||||
new LogAggregationFileControllerContext(
|
||||
logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
|
||||
// initialize the writer
|
||||
fileFormat.initializeWriter(context);
|
||||
|
||||
fileFormat.write(key1, value);
|
||||
LogAggregationFileControllerContext record = mock(
|
||||
LogAggregationFileControllerContext.class);
|
||||
fileFormat.postWrite(record);
|
||||
fileFormat.closeWriter();
|
||||
|
||||
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
||||
logRequest.setAppId(appId);
|
||||
logRequest.setNodeId(nodeId.toString());
|
||||
logRequest.setAppOwner(USER_UGI.getShortUserName());
|
||||
logRequest.setContainerId(containerId.toString());
|
||||
logRequest.setBytes(Long.MAX_VALUE);
|
||||
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
|
||||
logRequest);
|
||||
Assert.assertTrue(meta.size() == 1);
|
||||
List<String> fileNames = new ArrayList<>();
|
||||
for (ContainerLogMeta log : meta) {
|
||||
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
||||
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
||||
Assert.assertTrue(log.getContainerLogMeta().size() == 3);
|
||||
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
||||
fileNames.add(file.getFileName());
|
||||
}
|
||||
}
|
||||
fileNames.removeAll(logTypes);
|
||||
Assert.assertTrue(fileNames.isEmpty());
|
||||
|
||||
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
||||
Assert.assertTrue(foundLogs);
|
||||
for (String logType : logTypes) {
|
||||
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
||||
containerId, logType)));
|
||||
}
|
||||
sysOutStream.reset();
|
||||
|
||||
// create a checksum file
|
||||
Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
|
||||
appId, USER_UGI.getShortUserName()),
|
||||
LogAggregationUtils.getNodeString(nodeId)
|
||||
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
|
||||
FSDataOutputStream fInput = null;
|
||||
try {
|
||||
fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
|
||||
fInput.writeLong(0);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(fInput);
|
||||
}
|
||||
meta = fileFormat.readAggregatedLogsMeta(
|
||||
logRequest);
|
||||
Assert.assertTrue(meta.size() == 0);
|
||||
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
||||
Assert.assertFalse(foundLogs);
|
||||
sysOutStream.reset();
|
||||
fs.delete(checksumFile, false);
|
||||
Assert.assertFalse(fs.exists(checksumFile));
|
||||
|
||||
List<String> newLogTypes = new ArrayList<>(logTypes);
|
||||
files.clear();
|
||||
newLogTypes.add("test1");
|
||||
files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
|
||||
"test1"));
|
||||
newLogTypes.add("test2");
|
||||
files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
|
||||
"test2"));
|
||||
LogValue value2 = mock(LogValue.class);
|
||||
when(value2.getPendingLogFilesToUploadForThisContainer())
|
||||
.thenReturn(files);
|
||||
|
||||
// initialize the writer
|
||||
fileFormat.initializeWriter(context);
|
||||
fileFormat.write(key1, value2);
|
||||
fileFormat.closeWriter();
|
||||
|
||||
// We did not call postWriter which we would keep the checksum file.
|
||||
// We can only get the logs/logmeta from the first write.
|
||||
fileFormat.readAggregatedLogsMeta(
|
||||
logRequest);
|
||||
Assert.assertEquals(meta.size(), meta.size(), 1);
|
||||
for (ContainerLogMeta log : meta) {
|
||||
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
||||
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
||||
Assert.assertTrue(log.getContainerLogMeta().size() == 3);
|
||||
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
||||
fileNames.add(file.getFileName());
|
||||
}
|
||||
}
|
||||
fileNames.removeAll(logTypes);
|
||||
Assert.assertTrue(fileNames.isEmpty());
|
||||
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
||||
Assert.assertTrue(foundLogs);
|
||||
for (String logType : logTypes) {
|
||||
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
||||
containerId, logType)));
|
||||
}
|
||||
Assert.assertFalse(sysOutStream.toString().contains(logMessage(
|
||||
containerId, "test1")));
|
||||
Assert.assertFalse(sysOutStream.toString().contains(logMessage(
|
||||
containerId, "test2")));
|
||||
sysOutStream.reset();
|
||||
|
||||
// Call postWrite and we should get all logs/logmetas for both
|
||||
// first write and second write
|
||||
fileFormat.initializeWriter(context);
|
||||
fileFormat.write(key1, value2);
|
||||
fileFormat.postWrite(record);
|
||||
fileFormat.closeWriter();
|
||||
fileFormat.readAggregatedLogsMeta(
|
||||
logRequest);
|
||||
Assert.assertEquals(meta.size(), meta.size(), 2);
|
||||
for (ContainerLogMeta log : meta) {
|
||||
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
||||
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
||||
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
||||
fileNames.add(file.getFileName());
|
||||
}
|
||||
}
|
||||
fileNames.removeAll(newLogTypes);
|
||||
Assert.assertTrue(fileNames.isEmpty());
|
||||
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
||||
Assert.assertTrue(foundLogs);
|
||||
for (String logType : newLogTypes) {
|
||||
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
||||
containerId, logType)));
|
||||
}
|
||||
sysOutStream.reset();
|
||||
}
|
||||
|
||||
private File createAndWriteLocalLogFile(ContainerId containerId,
|
||||
Path localLogDir, String logType) throws IOException {
|
||||
File file = new File(localLogDir.toString(), logType);
|
||||
if (file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
file.createNewFile();
|
||||
Writer writer = null;
|
||||
try {
|
||||
writer = new FileWriter(file);
|
||||
writer.write(logMessage(containerId, logType));
|
||||
writer.close();
|
||||
return file;
|
||||
} finally {
|
||||
IOUtils.closeQuietly(writer);
|
||||
}
|
||||
}
|
||||
|
||||
private String logMessage(ContainerId containerId, String logType) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Hello " + containerId + " in " + logType + "!");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
|
||||
|
@ -851,7 +851,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
for (ContainerLogsInfo logInfo : responseText) {
|
||||
if(logInfo.getLogType().equals(
|
||||
ContainerLogAggregationType.AGGREGATED.toString())) {
|
||||
List<PerContainerLogFileInfo> logMeta = logInfo
|
||||
List<ContainerLogFileInfo> logMeta = logInfo
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), fileName);
|
||||
|
@ -879,7 +879,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
for (ContainerLogsInfo logInfo : responseText) {
|
||||
if(logInfo.getLogType().equals(
|
||||
ContainerLogAggregationType.AGGREGATED.toString())) {
|
||||
List<PerContainerLogFileInfo> logMeta = logInfo
|
||||
List<ContainerLogFileInfo> logMeta = logInfo
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), fileName);
|
||||
|
@ -917,7 +917,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertTrue(responseText.size() == 1);
|
||||
assertEquals(responseText.get(0).getLogType(),
|
||||
ContainerLogAggregationType.AGGREGATED.toString());
|
||||
List<PerContainerLogFileInfo> logMeta = responseText.get(0)
|
||||
List<ContainerLogFileInfo> logMeta = responseText.get(0)
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), fileName);
|
||||
|
|
|
@ -27,14 +27,14 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
|
||||
/**
|
||||
* {@code ContainerLogsInfo} includes the log meta-data of containers.
|
||||
* <p>
|
||||
* The container log meta-data includes details such as:
|
||||
* <ul>
|
||||
* <li>A list of {@link PerContainerLogFileInfo}.</li>
|
||||
* <li>A list of {@link ContainerLogFileInfo}.</li>
|
||||
* <li>The container Id.</li>
|
||||
* <li>The NodeManager Id.</li>
|
||||
* <li>The logType: could be local or aggregated</li>
|
||||
|
@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
|||
public class ContainerLogsInfo {
|
||||
|
||||
@XmlElement(name = "containerLogInfo")
|
||||
protected List<PerContainerLogFileInfo> containerLogsInfo;
|
||||
protected List<ContainerLogFileInfo> containerLogsInfo;
|
||||
|
||||
@XmlElement(name = "logAggregationType")
|
||||
protected String logType;
|
||||
|
@ -62,14 +62,14 @@ public class ContainerLogsInfo {
|
|||
|
||||
public ContainerLogsInfo(ContainerLogMeta logMeta,
|
||||
ContainerLogAggregationType logType) throws YarnException {
|
||||
this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
|
||||
this.containerLogsInfo = new ArrayList<ContainerLogFileInfo>(
|
||||
logMeta.getContainerLogMeta());
|
||||
this.logType = logType.toString();
|
||||
this.containerId = logMeta.getContainerId();
|
||||
this.nodeId = logMeta.getNodeId();
|
||||
}
|
||||
|
||||
public List<PerContainerLogFileInfo> getContainerLogsInfo() {
|
||||
public List<ContainerLogFileInfo> getContainerLogsInfo() {
|
||||
return this.containerLogsInfo;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
|
@ -55,10 +55,10 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
|
|||
containerId, remoteUser, nmContext);
|
||||
}
|
||||
|
||||
private static List<PerContainerLogFileInfo> getContainerLogsInfo(
|
||||
private static List<ContainerLogFileInfo> getContainerLogsInfo(
|
||||
ContainerId id, String remoteUser, Context nmContext)
|
||||
throws YarnException {
|
||||
List<PerContainerLogFileInfo> logFiles = new ArrayList<>();
|
||||
List<ContainerLogFileInfo> logFiles = new ArrayList<>();
|
||||
List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
|
||||
id, remoteUser, nmContext);
|
||||
for (File containerLogsDir : logDirs) {
|
||||
|
@ -66,7 +66,7 @@ public class NMContainerLogsInfo extends ContainerLogsInfo {
|
|||
if (logs != null) {
|
||||
for (File log : logs) {
|
||||
if (log.isFile()) {
|
||||
PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo(
|
||||
ContainerLogFileInfo logMeta = new ContainerLogFileInfo(
|
||||
log.getName(), Long.toString(log.length()),
|
||||
Times.format(log.lastModified()));
|
||||
logFiles.add(logMeta);
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
|
@ -537,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
assertTrue(responseList.size() == 1);
|
||||
assertEquals(responseList.get(0).getLogType(),
|
||||
ContainerLogAggregationType.LOCAL.toString());
|
||||
List<PerContainerLogFileInfo> logMeta = responseList.get(0)
|
||||
List<ContainerLogFileInfo> logMeta = responseList.get(0)
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), filename);
|
||||
|
@ -564,13 +564,13 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
for (ContainerLogsInfo logInfo : responseList) {
|
||||
if(logInfo.getLogType().equals(
|
||||
ContainerLogAggregationType.AGGREGATED.toString())) {
|
||||
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
assertTrue(meta.size() == 1);
|
||||
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
|
||||
} else {
|
||||
assertEquals(logInfo.getLogType(),
|
||||
ContainerLogAggregationType.LOCAL.toString());
|
||||
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
List<ContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
assertTrue(meta.size() == 1);
|
||||
assertEquals(meta.get(0).getFileName(), filename);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue