YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-09-01 03:04:55 -07:00
parent 2442a8d716
commit 119220b88f
19 changed files with 1299 additions and 989 deletions

View File

@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -467,7 +468,7 @@ public class TestLogsCLI {
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
logMessage(containerId1, "syslog")));
assertTrue(sysOutStream.toString().contains("Log Upload Time"));
assertTrue(sysOutStream.toString().contains("LogLastModifiedTime"));
assertTrue(!sysOutStream.toString().contains(
"Logs for container " + containerId1.toString()
+ " are not present in this log-file."));
@ -491,8 +492,12 @@ public class TestLogsCLI {
String logMessage = logMessage(containerId3, "stdout");
int fileContentSize = logMessage.getBytes().length;
int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length;
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:stdout";
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
int tailContentSize = sb.toString().length();
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
// container log

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
/**
* Utils for rendering aggregated logs block.
*
*/
@Private
public final class LogAggregationWebUtils {
private LogAggregationWebUtils() {}
/**
* Parse start index from html.
* @param html the html
* @param startStr the start index string
* @return the startIndex
*/
public static long getLogStartIndex(Block html, String startStr)
throws NumberFormatException {
long start = -4096;
if (startStr != null && !startStr.isEmpty()) {
start = Long.parseLong(startStr);
}
return start;
}
/**
* Parse end index from html.
* @param html the html
* @param endStr the end index string
* @return the endIndex
*/
public static long getLogEndIndex(Block html, String endStr)
throws NumberFormatException {
long end = Long.MAX_VALUE;
if (endStr != null && !endStr.isEmpty()) {
end = Long.parseLong(endStr);
}
return end;
}
/**
* Verify and parse containerId.
* @param html the html
* @param containerIdStr the containerId string
* @return the {@link ContainerId}
*/
public static ContainerId verifyAndGetContainerId(Block html,
String containerIdStr) {
if (containerIdStr == null || containerIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a ContainerId")._();
return null;
}
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException e) {
html.h1()
._("Cannot get container logs for invalid containerId: "
+ containerIdStr)._();
return null;
}
return containerId;
}
/**
* Verify and parse NodeId.
* @param html the html
* @param nodeIdStr the nodeId string
* @return the {@link NodeId}
*/
public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) {
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a NodeId")._();
return null;
}
NodeId nodeId = null;
try {
nodeId = NodeId.fromString(nodeIdStr);
} catch (IllegalArgumentException e) {
html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
._();
return null;
}
return nodeId;
}
/**
* Verify and parse the application owner.
* @param html the html
* @param appOwner the Application owner
* @return the appOwner
*/
public static String verifyAndGetAppOwner(Block html, String appOwner) {
if (appOwner == null || appOwner.isEmpty()) {
html.h1()._("Cannot get container logs without an app owner")._();
}
return appOwner;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import com.google.common.annotations.VisibleForTesting;
public class LogCLIHelpers implements Configurable {
@ -56,6 +53,7 @@ public class LogCLIHelpers implements Configurable {
"Container: %s on %s";
private Configuration conf;
private LogAggregationFileControllerFactory factory;
@Private
@VisibleForTesting
@ -130,71 +128,11 @@ public class LogCLIHelpers implements Configurable {
@VisibleForTesting
public int dumpAContainerLogsForLogType(ContainerLogsRequest options,
boolean outputFailure) throws IOException {
ApplicationId applicationId = options.getAppId();
String jobOwner = options.getAppOwner();
String nodeId = options.getNodeId();
String containerId = options.getContainerId();
String localDir = options.getOutputLocalDir();
List<String> logType = new ArrayList<String>(options.getLogTypes());
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner);
if (nodeFiles == null) {
return -1;
}
boolean foundContainerLogs = false;
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
String fileName = thisNodeFile.getPath().getName();
if (fileName.equals(applicationId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
PrintStream out = createPrintStream(localDir, fileName, containerId);
try {
reader = new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
if (getContainerLogsStream(containerId, reader) == null) {
continue;
}
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
// We have to re-create reader object to reset the stream index
// after calling getContainerLogsStream which would move the stream
// index to the end of the log file.
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
foundContainerLogs = true;
}
} else {
if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType,
options.getBytes()) > -1) {
foundContainerLogs = true;
}
}
} finally {
if (reader != null) {
reader.close();
}
closePrintStream(out);
}
}
}
if (!foundContainerLogs) {
boolean foundAnyLogs = this.getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogs(options, null);
if (!foundAnyLogs) {
if (outputFailure) {
containerLogNotFound(containerId);
containerLogNotFound(options.getContainerId());
}
return -1;
}
@ -204,217 +142,25 @@ public class LogCLIHelpers implements Configurable {
@Private
public int dumpAContainerLogsForLogTypeWithoutNodeId(
ContainerLogsRequest options) throws IOException {
ApplicationId applicationId = options.getAppId();
String jobOwner = options.getAppOwner();
String containerId = options.getContainerId();
String localDir = options.getOutputLocalDir();
List<String> logType = new ArrayList<String>(options.getLogTypes());
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner);
if (nodeFiles == null) {
return -1;
}
boolean foundContainerLogs = false;
while(nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (!thisNodeFile.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
PrintStream out = System.out;
try {
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
if (getContainerLogsStream(containerId, reader) == null) {
continue;
}
// We have to re-create reader object to reset the stream index
// after calling getContainerLogsStream which would move the stream
// index to the end of the log file.
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
containerId);
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
foundContainerLogs = true;
}
} else {
if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType,
options.getBytes()) > -1) {
foundContainerLogs = true;
}
}
} finally {
if (reader != null) {
reader.close();
}
closePrintStream(out);
}
}
}
if (!foundContainerLogs) {
containerLogNotFound(containerId);
boolean foundAnyLogs = getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogs(
options, null);
if (!foundAnyLogs) {
containerLogNotFound(options.getContainerId());
return -1;
}
return 0;
}
@Private
public int dumpAContainerLogs(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, long bytes) throws IOException {
DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader);
if (valueStream == null) {
return -1;
}
boolean foundContainerLogs = false;
while (true) {
try {
LogReader.readAContainerLogsForALogType(valueStream, out,
logUploadedTime, bytes);
foundContainerLogs = true;
} catch (EOFException eof) {
break;
}
}
if (foundContainerLogs) {
return 0;
}
return -1;
}
private DataInputStream getContainerLogsStream(String containerIdStr,
AggregatedLogFormat.LogReader reader) throws IOException {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString().equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
return valueStream;
}
@Private
public int dumpAContainerLogsForALogType(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType, long bytes)
throws IOException {
DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader);
if (valueStream == null) {
return -1;
}
boolean foundContainerLogs = false;
while (true) {
try {
int result = LogReader.readContainerLogsForALogType(
valueStream, out, logUploadedTime, logType, bytes);
if (result == 0) {
foundContainerLogs = true;
}
} catch (EOFException eof) {
break;
}
}
if (foundContainerLogs) {
return 0;
}
return -1;
}
@Private
public int dumpAllContainersLogs(ContainerLogsRequest options)
throws IOException {
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
String localDir = options.getOutputLocalDir();
List<String> logTypes = new ArrayList<String>(options.getLogTypes());
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner);
if (nodeFiles == null) {
return -1;
}
boolean foundAnyLogs = false;
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (thisNodeFile.getPath().getName().equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
PrintStream out = createPrintStream(localDir,
thisNodeFile.getPath().getName(), key.toString());
try {
String containerString = String.format(
CONTAINER_ON_NODE_PATTERN, key,
thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
if (logTypes == null || logTypes.isEmpty()) {
LogReader.readAContainerLogsForALogType(valueStream, out,
thisNodeFile.getModificationTime(),
options.getBytes());
foundAnyLogs = true;
} else {
int result = LogReader.readContainerLogsForALogType(
valueStream, out, thisNodeFile.getModificationTime(),
logTypes, options.getBytes());
if (result == 0) {
foundAnyLogs = true;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
closePrintStream(out);
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
boolean foundAnyLogs = getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogs(
options, null);
if (!foundAnyLogs) {
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(
conf, options.getAppId(), options.getAppOwner())
.toString());
return -1;
}
@ -425,14 +171,13 @@ public class LogCLIHelpers implements Configurable {
public int printAContainerLogMetadata(ContainerLogsRequest options,
PrintStream out, PrintStream err)
throws IOException {
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
String nodeId = options.getNodeId();
String containerIdStr = options.getContainerId();
List<ContainerLogMeta> containersLogMeta;
try {
containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
conf, appId, containerIdStr, nodeId, appOwner);
containersLogMeta = getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogsMeta(
options);
} catch (Exception ex) {
err.println(ex.getMessage());
return -1;
@ -473,8 +218,26 @@ public class LogCLIHelpers implements Configurable {
PrintStream out, PrintStream err) throws IOException {
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner);
LogAggregationFileController fileFormat = null;
try {
fileFormat = getFileController(appId, appOwner);
} catch (Exception ex) {
err.println(ex.getMessage());
return;
}
RemoteIterator<FileStatus> nodeFiles = null;
try {
nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId,
appOwner, fileFormat.getRemoteRootLogDir(),
fileFormat.getRemoteRootLogDirSuffix());
} catch (FileNotFoundException fnf) {
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString());
} catch (AccessControlException | AccessDeniedException ace) {
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString(), appOwner,
ace.getMessage());
}
if (nodeFiles == null) {
return;
}
@ -497,44 +260,21 @@ public class LogCLIHelpers implements Configurable {
public void printContainersList(ContainerLogsRequest options,
PrintStream out, PrintStream err) throws IOException {
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
String nodeId = options.getNodeId();
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner);
if (nodeFiles == null) {
return;
}
boolean foundAnyLogs = false;
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
out.println(String.format(CONTAINER_ON_NODE_PATTERN, key,
thisNodeFile.getPath().getName()));
foundAnyLogs = true;
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
try {
containersLogMeta = getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogsMeta(
options);
} catch (Exception ex) {
err.println(ex.getMessage());
}
for(ContainerLogMeta logMeta : containersLogMeta) {
out.println(String.format(CONTAINER_ON_NODE_PATTERN,
logMeta.getContainerId(),
logMeta.getNodeId()));
foundAnyLogs = true;
}
if (!foundAnyLogs) {
if (nodeId != null) {
@ -547,23 +287,6 @@ public class LogCLIHelpers implements Configurable {
}
}
private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
String appOwner) throws IOException {
RemoteIterator<FileStatus> nodeFiles = null;
try {
nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
conf, appId, appOwner);
} catch (FileNotFoundException fnf) {
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString());
} catch (AccessControlException | AccessDeniedException ace) {
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString(), appOwner,
ace.getMessage());
}
return nodeFiles;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
@ -620,59 +343,29 @@ public class LogCLIHelpers implements Configurable {
@Private
public Set<String> listContainerLogs(ContainerLogsRequest options)
throws IOException {
List<ContainerLogMeta> containersLogMeta;
Set<String> logTypes = new HashSet<String>();
ApplicationId appId = options.getAppId();
String appOwner = options.getAppOwner();
String nodeId = options.getNodeId();
String containerIdStr = options.getContainerId();
boolean getAllContainers = (containerIdStr == null);
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner);
if (nodeFiles == null) {
try {
containersLogMeta = getFileController(options.getAppId(),
options.getAppOwner()).readAggregatedLogsMeta(
options);
} catch (Exception ex) {
System.err.println(ex.getMessage());
return logTypes;
}
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
while (true) {
try {
String logFile = LogReader.readContainerMetaDataAndSkipData(
valueStream).getFirst();
logTypes.add(logFile);
} catch (EOFException eof) {
break;
}
}
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
for (ContainerLogMeta logMeta: containersLogMeta) {
for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
logTypes.add(fileInfo.getFileName());
}
}
return logTypes;
}
private LogAggregationFileController getFileController(ApplicationId appId,
String appOwner) throws IOException {
if (factory == null) {
factory = new LogAggregationFileControllerFactory(conf);
}
return factory.getFileControllerForRead(appId, appOwner);
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -27,19 +25,7 @@ import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.util.Times;
/**
* This class contains several utility function which could be used in different
@ -53,81 +39,6 @@ public final class LogToolUtils {
private LogToolUtils() {}
/**
* Return a list of {@link ContainerLogMeta} for a container
* from Remote FileSystem.
*
* @param conf the configuration
* @param appId the applicationId
* @param containerIdStr the containerId
* @param nodeId the nodeId
* @param appOwner the application owner
* @return a list of {@link ContainerLogMeta}
* @throws IOException if there is no available log file
*/
public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
Configuration conf, ApplicationId appId, String containerIdStr,
String nodeId, String appOwner) throws IOException {
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
boolean getAllContainers = (containerIdStr == null);
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
if (nodeFiles == null) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
key.toString(), thisNodeFile.getPath().getName());
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream);
containerLogMeta.addLogMeta(
logMeta.getFirst(),
logMeta.getSecond(),
Times.format(thisNodeFile.getModificationTime()));
} catch (EOFException eof) {
break;
}
}
containersLogMeta.add(containerLogMeta);
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
return containersLogMeta;
}
/**
* Output container log.
* @param containerId the containerId
@ -247,82 +158,4 @@ public final class LogToolUtils {
}
}
public static boolean outputAggregatedContainerLog(Configuration conf,
ApplicationId appId, String appOwner,
String containerId, String nodeId,
String logFileName, long outputSize, OutputStream os,
byte[] buf) throws IOException {
boolean findLogs = false;
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
while (nodeFiles != null && nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerId)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFileName)) {
LogToolUtils.outputContainerLog(containerId,
nodeId, fileType, fileLength, outputSize,
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf,
ContainerLogAggregationType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
os.flush();
return findLogs;
}
}

View File

@ -24,6 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -31,7 +35,9 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -42,13 +48,18 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
/**
* Base class to implement Log Aggregation File Controller.
@ -167,6 +178,74 @@ public abstract class LogAggregationFileController {
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
protected PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
}
}
/**
* Output container log.
* @param logRequest {@link ContainerLogsRequest}
* @param os the output stream
* @throws IOException if we can not access the log file.
*/
public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException;
/**
* Return a list of {@link ContainerLogMeta} for an application
* from Remote FileSystem.
*
* @param logRequest {@link ContainerLogsRequest}
* @return a list of {@link ContainerLogMeta}
* @throws IOException if there is no available log file
*/
public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException;
/**
* Render Aggregated Logs block.
* @param html the html
* @param context the ViewContext
*/
public abstract void renderAggregatedLogsBlock(Block html,
ViewContext context);
/**
* Returns the owner of the application.
*
* @param the aggregatedLog path.
* @return the application owner.
* @throws IOException
*/
public abstract String getApplicationOwner(Path aggregatedLogPath)
throws IOException;
/**
* Returns ACLs for the application. An empty map is returned if no ACLs are
* found.
*
* @param the aggregatedLog path.
* @return a map of the Application ACLs.
* @throws IOException
*/
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException;
/**
* Verify and create the remote log directory.
*/

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
/**
* Base class to implement Aggregated Logs Block.
*/
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
public abstract class LogAggregationHtmlBlock extends HtmlBlock {
@Inject
public LogAggregationHtmlBlock(ViewContext ctx) {
super(ctx);
}
protected BlockParameters verifyAndParseParameters(Block html) {
BlockParameters params = new BlockParameters();
ContainerId containerId = LogAggregationWebUtils
.verifyAndGetContainerId(html, $(CONTAINER_ID));
params.setContainerId(containerId);
NodeId nodeId = LogAggregationWebUtils
.verifyAndGetNodeId(html, $(NM_NODENAME));
params.setNodeId(nodeId);
String appOwner = LogAggregationWebUtils
.verifyAndGetAppOwner(html, $(APP_OWNER));
params.setAppOwner(appOwner);
boolean isValid = true;
long start = -4096;
try {
start = LogAggregationWebUtils.getLogStartIndex(
html, $("start"));
} catch (NumberFormatException ne) {
html.h1()._("Invalid log start value: " + $("start"))._();
isValid = false;
}
params.setStartIndex(start);
long end = Long.MAX_VALUE;
try {
end = LogAggregationWebUtils.getLogEndIndex(
html, $("end"));
} catch (NumberFormatException ne) {
html.h1()._("Invalid log start value: " + $("end"))._();
isValid = false;
}
params.setEndIndex(end);
if (containerId == null || nodeId == null || appOwner == null
|| appOwner.isEmpty() || !isValid) {
return null;
}
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
params.setAppId(appId);
String logEntity = $(ENTITY_STRING);
if (logEntity == null || logEntity.isEmpty()) {
logEntity = containerId.toString();
}
params.setLogEntity(logEntity);
return params;
}
protected boolean checkAcls(Configuration conf, ApplicationId appId,
String owner, Map<ApplicationAccessType, String> appAcls,
String remoteUser) {
ApplicationACLsManager aclsManager = new ApplicationACLsManager(
conf);
aclsManager.addApplication(appId, appAcls);
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, owner, appId)) {
return false;
}
return true;
}
protected static class BlockParameters {
private ApplicationId appId;
private ContainerId containerId;
private NodeId nodeId;
private String appOwner;
private long start;
private long end;
private String logEntity;
public ApplicationId getAppId() {
return appId;
}
public void setAppId(ApplicationId appId) {
this.appId = appId;
}
public ContainerId getContainerId() {
return containerId;
}
public void setContainerId(ContainerId containerId) {
this.containerId = containerId;
}
public NodeId getNodeId() {
return nodeId;
}
public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
public String getAppOwner() {
return appOwner;
}
public void setAppOwner(String appOwner) {
this.appOwner = appOwner;
}
public long getStartIndex() {
return start;
}
public void setStartIndex(long startIndex) {
this.start = startIndex;
}
public long getEndIndex() {
return end;
}
public void setEndIndex(long endIndex) {
this.end = endIndex;
}
public String getLogEntity() {
return logEntity;
}
public void setLogEntity(String logEntity) {
this.logEntity = logEntity;
}
}
}

View File

@ -1,127 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.Times;
/**
* The TFile log aggregation file Controller implementation.
*/
@Private
@Unstable
public class LogAggregationTFileController
extends LogAggregationFileController {
private static final Log LOG = LogFactory.getLog(
LogAggregationTFileController.class);
private LogWriter writer;
public LogAggregationTFileController(){}
@Override
public void initInternal(Configuration conf) {
this.remoteRootLogDir = new Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
@Override
public void initializeWriter(LogAggregationFileControllerContext context)
throws IOException {
this.writer = new LogWriter();
writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
context.getUserUgi());
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(context.getAppAcls());
writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
}
@Override
public void closeWriter() {
this.writer.close();
}
@Override
public void write(LogKey logKey, LogValue logValue) throws IOException {
this.writer.append(logKey, logValue);
}
@Override
public void postWrite(final LogAggregationFileControllerContext record)
throws Exception {
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (record.isUploadedLogsInThisCycle() &&
record.isLogAggregationInRolling()) {
cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
record.getUserUgi());
record.increcleanupOldLogTimes();
}
final Path renamedPath = record.getRollingMonitorInterval() <= 0
? record.getRemoteNodeLogFileForApp() : new Path(
record.getRemoteNodeLogFileForApp().getParent(),
record.getRemoteNodeLogFileForApp().getName() + "_"
+ record.getLogUploadTimeStamp());
final boolean rename = record.isUploadedLogsInThisCycle();
try {
record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
.getFileSystem(conf);
if (rename) {
remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
renamedPath);
} else {
remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
}
return null;
}
});
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ renamedPath + "]", e);
throw new Exception("Log uploaded failed for Application: "
+ record.getAppId() + " in NodeManager: "
+ LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ Times.format(record.getLogUploadTimeStamp()) + "\n");
}
}
}

View File

@ -0,0 +1,375 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
/**
* The TFile log aggregation file Controller implementation.
*/
@Private
@Unstable
public class LogAggregationTFileController
extends LogAggregationFileController {
private static final Log LOG = LogFactory.getLog(
LogAggregationTFileController.class);
private LogWriter writer;
private TFileLogReader tfReader = null;
public LogAggregationTFileController(){}
@Override
public void initInternal(Configuration conf) {
this.remoteRootLogDir = new Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
@Override
public void initializeWriter(LogAggregationFileControllerContext context)
throws IOException {
this.writer = new LogWriter();
writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
context.getUserUgi());
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(context.getAppAcls());
writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
}
@Override
public void closeWriter() {
this.writer.close();
this.writer = null;
}
@Override
public void write(LogKey logKey, LogValue logValue) throws IOException {
this.writer.append(logKey, logValue);
}
@Override
public void postWrite(final LogAggregationFileControllerContext record)
throws Exception {
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (record.isUploadedLogsInThisCycle() &&
record.isLogAggregationInRolling()) {
cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
record.getUserUgi());
record.increcleanupOldLogTimes();
}
final Path renamedPath = record.getRollingMonitorInterval() <= 0
? record.getRemoteNodeLogFileForApp() : new Path(
record.getRemoteNodeLogFileForApp().getParent(),
record.getRemoteNodeLogFileForApp().getName() + "_"
+ record.getLogUploadTimeStamp());
final boolean rename = record.isUploadedLogsInThisCycle();
try {
record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
.getFileSystem(conf);
if (rename) {
remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
renamedPath);
} else {
remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
}
return null;
}
});
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ renamedPath + "]", e);
throw new Exception("Log uploaded failed for Application: "
+ record.getAppId() + " in NodeManager: "
+ LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ Times.format(record.getLogUploadTimeStamp()) + "\n");
}
}
@Override
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException {
boolean findLogs = false;
boolean createPrintStream = (os == null);
ApplicationId appId = logRequest.getAppId();
String nodeId = logRequest.getNodeId();
List<String> logTypes = new ArrayList<>();
if (logRequest.getLogTypes() != null && !logRequest
.getLogTypes().isEmpty()) {
logTypes.addAll(logRequest.getLogTypes());
}
String containerIdStr = logRequest.getContainerId();
boolean getAllContainers = (containerIdStr == null
|| containerIdStr.isEmpty());
long size = logRequest.getBytes();
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner());
byte[] buf = new byte[65535];
while (nodeFiles != null && nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
LOG.error(thisNodeFile.getPath().toString());
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
if (createPrintStream) {
os = createPrintStream(
logRequest.getOutputLocalDir(),
thisNodeFile.getPath().getName(), key.toString());
}
try {
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (logTypes == null || logTypes.isEmpty() ||
logTypes.contains(fileType)) {
LogToolUtils.outputContainerLog(key.toString(),
nodeName, fileType, fileLength, size,
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf,
ContainerLogAggregationType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
os.flush();
if (createPrintStream) {
closePrintStream(os);
}
}
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
return findLogs;
}
@Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
String containerIdStr = logRequest.getContainerId();
String nodeId = logRequest.getNodeId();
ApplicationId appId = logRequest.getAppId();
String appOwner = logRequest.getAppOwner();
boolean getAllContainers = (containerIdStr == null);
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
if (nodeFiles == null) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
key.toString(), thisNodeFile.getPath().getName());
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream);
containerLogMeta.addLogMeta(
logMeta.getFirst(),
logMeta.getSecond(),
Times.format(thisNodeFile.getModificationTime()));
} catch (EOFException eof) {
break;
}
}
containersLogMeta.add(containerLogMeta);
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
return containersLogMeta;
}
@Override
public void renderAggregatedLogsBlock(Block html, ViewContext context) {
TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(
context, conf);
block.render(html);
}
@Override
public String getApplicationOwner(Path aggregatedLog) throws IOException {
createTFileLogReader(aggregatedLog);
return this.tfReader.getLogReader().getApplicationOwner();
}
@Override
public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLog) throws IOException {
createTFileLogReader(aggregatedLog);
return this.tfReader.getLogReader().getApplicationAcls();
}
private void createTFileLogReader(Path aggregatedLog) throws IOException {
if (this.tfReader == null || !this.tfReader.getAggregatedLogPath()
.equals(aggregatedLog)) {
LogReader logReader = new LogReader(conf, aggregatedLog);
this.tfReader = new TFileLogReader(logReader, aggregatedLog);
}
}
private static class TFileLogReader {
private LogReader logReader;
private Path aggregatedLogPath;
TFileLogReader(LogReader logReader, Path aggregatedLogPath) {
this.setLogReader(logReader);
this.setAggregatedLogPath(aggregatedLogPath);
}
public LogReader getLogReader() {
return logReader;
}
public void setLogReader(LogReader logReader) {
this.logReader = logReader;
}
public Path getAggregatedLogPath() {
return aggregatedLogPath;
}
public void setAggregatedLogPath(Path aggregatedLogPath) {
this.aggregatedLogPath = aggregatedLogPath;
}
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
/**
* The Aggregated Logs Block implementation for TFile.
*/
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
private final Configuration conf;
@Inject
public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) {
super(ctx);
this.conf = conf;
}
@Override
protected void render(Block html) {
BlockParameters params = verifyAndParseParameters(html);
if (params == null) {
return;
}
RemoteIterator<FileStatus> nodeFiles;
try {
nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, params.getAppId(),
params.getAppOwner());
} catch (RuntimeException e) {
throw e;
} catch (Exception ex) {
html.h1("No logs available for container "
+ params.getContainerId().toString());
return;
}
NodeId nodeId = params.getNodeId();
String logEntity = params.getLogEntity();
ApplicationId appId = params.getAppId();
ContainerId containerId = params.getContainerId();
long start = params.getStartIndex();
long end = params.getEndIndex();
boolean foundLog = false;
String desiredLogType = $(CONTAINER_LOG_TYPE);
try {
while (nodeFiles.hasNext()) {
AggregatedLogFormat.LogReader reader = null;
try {
FileStatus thisNodeFile = nodeFiles.next();
if (thisNodeFile.getPath().getName().equals(
params.getAppId() + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if (!thisNodeFile.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
|| thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
continue;
}
long logUploadedTime = thisNodeFile.getModificationTime();
reader = new AggregatedLogFormat.LogReader(
conf, thisNodeFile.getPath());
String owner = null;
Map<ApplicationAccessType, String> appAcls = null;
try {
owner = reader.getApplicationOwner();
appAcls = reader.getApplicationAcls();
} catch (IOException e) {
LOG.error("Error getting logs for " + logEntity, e);
continue;
}
String remoteUser = request().getRemoteUser();
if (!checkAcls(conf, appId, owner, appAcls, remoteUser)) {
html.h1()._("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity
+ " in log file [" + thisNodeFile.getPath().getName() + "]")
._();
LOG.error("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity);
continue;
}
AggregatedLogFormat.ContainerLogsReader logReader = reader
.getContainerLogsReader(containerId);
if (logReader == null) {
continue;
}
foundLog = readContainerLogs(html, logReader, start, end,
desiredLogType, logUploadedTime);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
} finally {
if (reader != null) {
reader.close();
}
}
}
if (!foundLog) {
if (desiredLogType.isEmpty()) {
html.h1("No logs available for container "
+ containerId.toString());
} else {
html.h1("Unable to locate '" + desiredLogType
+ "' log for container " + containerId.toString());
}
}
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
}
}
private boolean readContainerLogs(Block html,
AggregatedLogFormat.ContainerLogsReader logReader, long startIndex,
long endIndex, String desiredLogType, long logUpLoadTime)
throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
boolean foundLog = false;
String logType = logReader.nextLog();
while (logType != null) {
if (desiredLogType == null || desiredLogType.isEmpty()
|| desiredLogType.equals(logType)) {
long logLength = logReader.getCurrentLogLength();
if (foundLog) {
html.pre()._("\n\n")._();
}
html.p()._("Log Type: " + logType)._();
html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
html.p()._("Log Length: " + Long.toString(logLength))._();
long start = startIndex < 0
? logLength + startIndex : startIndex;
start = start < 0 ? 0 : start;
start = start > logLength ? logLength : start;
long end = endIndex < 0
? logLength + endIndex : endIndex;
end = end < 0 ? 0 : end;
end = end > logLength ? logLength : end;
end = end < start ? start : end;
long toRead = end - start;
if (toRead < logLength) {
html.p()._("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0"), "here").
_(" for the full log.")._();
}
long totalSkipped = 0;
while (totalSkipped < start) {
long ret = logReader.skip(start - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = logReader.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException("Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
pre._(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
foundLog = true;
}
logType = logReader.nextLog();
}
return foundLog;
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -20,34 +20,18 @@ package org.apache.hadoop.yarn.webapp.log;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -56,20 +40,40 @@ import com.google.inject.Inject;
public class AggregatedLogsBlock extends HtmlBlock {
private final Configuration conf;
private final LogAggregationFileControllerFactory factory;
@Inject
AggregatedLogsBlock(Configuration conf) {
this.conf = conf;
factory = new LogAggregationFileControllerFactory(conf);
}
@Override
protected void render(Block html) {
ContainerId containerId = verifyAndGetContainerId(html);
NodeId nodeId = verifyAndGetNodeId(html);
String appOwner = verifyAndGetAppOwner(html);
LogLimits logLimits = verifyAndGetLogLimits(html);
ContainerId containerId = LogAggregationWebUtils
.verifyAndGetContainerId(html, $(CONTAINER_ID));
NodeId nodeId = LogAggregationWebUtils
.verifyAndGetNodeId(html, $(NM_NODENAME));
String appOwner = LogAggregationWebUtils
.verifyAndGetAppOwner(html, $(APP_OWNER));
boolean isValid = true;
try {
LogAggregationWebUtils.getLogStartIndex(
html, $("start"));
} catch (NumberFormatException ne) {
html.h1()._("Invalid log start value: " + $("start"))._();
isValid = false;
}
try {
LogAggregationWebUtils.getLogEndIndex(
html, $("end"));
} catch (NumberFormatException ne) {
html.h1()._("Invalid log start value: " + $("end"))._();
isValid = false;
}
if (containerId == null || nodeId == null || appOwner == null
|| appOwner.isEmpty() || logLimits == null) {
|| appOwner.isEmpty() || !isValid) {
return;
}
@ -94,21 +98,11 @@ public class AggregatedLogsBlock extends HtmlBlock {
return;
}
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, applicationId, appOwner,
LogAggregationUtils.getRemoteNodeLogDirSuffix(conf));
RemoteIterator<FileStatus> nodeFiles;
LogAggregationFileController fileController;
try {
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(
remoteAppDir);
nodeFiles =
FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
.listStatus(remoteAppDir);
} catch (FileNotFoundException fnf) {
fileController = this.factory.getFileControllerForRead(
applicationId, appOwner);
} catch (Exception fnf) {
html.h1()
._("Logs not available for " + logEntity
+ ". Aggregation may not be complete, "
@ -119,250 +113,9 @@ public class AggregatedLogsBlock extends HtmlBlock {
._();
}
return;
} catch (Exception ex) {
html.h1()
._("Error getting logs at " + nodeId)._();
return;
}
boolean foundLog = false;
String desiredLogType = $(CONTAINER_LOG_TYPE);
try {
while (nodeFiles.hasNext()) {
AggregatedLogFormat.LogReader reader = null;
try {
FileStatus thisNodeFile = nodeFiles.next();
if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if (!thisNodeFile.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
|| thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
continue;
}
long logUploadedTime = thisNodeFile.getModificationTime();
reader =
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
String owner = null;
Map<ApplicationAccessType, String> appAcls = null;
try {
owner = reader.getApplicationOwner();
appAcls = reader.getApplicationAcls();
} catch (IOException e) {
LOG.error("Error getting logs for " + logEntity, e);
continue;
}
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
aclsManager.addApplication(applicationId, appAcls);
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, owner, applicationId)) {
html.h1()
._("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity
+ " in log file [" + thisNodeFile.getPath().getName() + "]")._();
LOG.error("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity);
continue;
}
AggregatedLogFormat.ContainerLogsReader logReader = reader
.getContainerLogsReader(containerId);
if (logReader == null) {
continue;
}
foundLog = readContainerLogs(html, logReader, logLimits,
desiredLogType, logUploadedTime);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
} finally {
if (reader != null)
reader.close();
}
}
if (!foundLog) {
if (desiredLogType.isEmpty()) {
html.h1("No logs available for container " + containerId.toString());
} else {
html.h1("Unable to locate '" + desiredLogType
+ "' log for container " + containerId.toString());
}
}
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
}
}
private boolean readContainerLogs(Block html,
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
String desiredLogType, long logUpLoadTime) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
boolean foundLog = false;
String logType = logReader.nextLog();
while (logType != null) {
if (desiredLogType == null || desiredLogType.isEmpty()
|| desiredLogType.equals(logType)) {
long logLength = logReader.getCurrentLogLength();
if (foundLog) {
html.pre()._("\n\n")._();
}
html.p()._("Log Type: " + logType)._();
html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
html.p()._("Log Length: " + Long.toString(logLength))._();
long start = logLimits.start < 0
? logLength + logLimits.start : logLimits.start;
start = start < 0 ? 0 : start;
start = start > logLength ? logLength : start;
long end = logLimits.end < 0
? logLength + logLimits.end : logLimits.end;
end = end < 0 ? 0 : end;
end = end > logLength ? logLength : end;
end = end < start ? start : end;
long toRead = end - start;
if (toRead < logLength) {
html.p()._("Showing " + toRead + " bytes of " + logLength
+ " total. Click ")
.a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0"), "here").
_(" for the full log.")._();
}
long totalSkipped = 0;
while (totalSkipped < start) {
long ret = logReader.skip(start - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = logReader.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException( "Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
pre._(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
foundLog = true;
}
logType = logReader.nextLog();
}
return foundLog;
}
private ContainerId verifyAndGetContainerId(Block html) {
String containerIdStr = $(CONTAINER_ID);
if (containerIdStr == null || containerIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a ContainerId")._();
return null;
}
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException e) {
html.h1()
._("Cannot get container logs for invalid containerId: "
+ containerIdStr)._();
return null;
}
return containerId;
}
private NodeId verifyAndGetNodeId(Block html) {
String nodeIdStr = $(NM_NODENAME);
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a NodeId")._();
return null;
}
NodeId nodeId = null;
try {
nodeId = NodeId.fromString(nodeIdStr);
} catch (IllegalArgumentException e) {
html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
._();
return null;
}
return nodeId;
}
private String verifyAndGetAppOwner(Block html) {
String appOwner = $(APP_OWNER);
if (appOwner == null || appOwner.isEmpty()) {
html.h1()._("Cannot get container logs without an app owner")._();
}
return appOwner;
}
private static class LogLimits {
long start;
long end;
}
private LogLimits verifyAndGetLogLimits(Block html) {
long start = -4096;
long end = Long.MAX_VALUE;
boolean isValid = true;
String startStr = $("start");
if (startStr != null && !startStr.isEmpty()) {
try {
start = Long.parseLong(startStr);
} catch (NumberFormatException e) {
isValid = false;
html.h1()._("Invalid log start value: " + startStr)._();
}
}
String endStr = $("end");
if (endStr != null && !endStr.isEmpty()) {
try {
end = Long.parseLong(endStr);
} catch (NumberFormatException e) {
isValid = false;
html.h1()._("Invalid log end value: " + endStr)._();
}
}
if (!isValid) {
return null;
}
LogLimits limits = new LogLimits();
limits.start = start;
limits.end = end;
return limits;
fileController.renderAggregatedLogsBlock(html, this.context());
}
private String getApplicationLogURL(ApplicationId applicationId) {

View File

@ -1148,7 +1148,7 @@
<property>
<description>Class that supports TFile read and write operations.</description>
<name>yarn.log-aggregation.file-controller.TFile.class</name>
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController</value>
</property>
<property>

View File

@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.TFileAggregatedLogsBlock;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
@ -56,6 +58,9 @@ import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
import org.junit.Test;
import static org.mockito.Mockito.*;
import com.google.inject.Inject;
import static org.junit.Assert.*;
/**
@ -77,12 +82,14 @@ public class TestAggregatedLogsBlock {
writeLog(configuration, "owner");
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
configuration, "owner", "container_0_0001_01_000001");
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(data);
HtmlBlock html = new HtmlBlockForTest();
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
TFileAggregatedLogsBlockForTest aggregatedBlock
= getTFileAggregatedLogsBlockForTest(configuration, "owner",
"container_0_0001_01_000001", "localhost:1234");
aggregatedBlock.render(block);
block.getWriter().flush();
@ -158,12 +165,13 @@ public class TestAggregatedLogsBlock {
writeLog(configuration, "admin");
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
configuration, "admin", "container_0_0001_01_000001");
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(data);
HtmlBlock html = new HtmlBlockForTest();
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
TFileAggregatedLogsBlockForTest aggregatedBlock
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
"container_0_0001_01_000001", "localhost:1234");
aggregatedBlock.render(block);
block.getWriter().flush();
@ -191,13 +199,13 @@ public class TestAggregatedLogsBlock {
"/application_1440536969523_0001.har";
FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path));
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
configuration, "admin",
"container_1440536969523_0001_01_000001", "host1:1111");
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(data);
HtmlBlock html = new HtmlBlockForTest();
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
TFileAggregatedLogsBlockForTest aggregatedBlock
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
"container_1440536969523_0001_01_000001", "host1:1111");
aggregatedBlock.render(block);
block.getWriter().flush();
@ -206,7 +214,7 @@ public class TestAggregatedLogsBlock {
assertTrue(out.contains("Hello stdout"));
assertTrue(out.contains("Hello syslog"));
aggregatedBlock = getAggregatedLogsBlockForTest(
aggregatedBlock = getTFileAggregatedLogsBlockForTest(
configuration, "admin",
"container_1440536969523_0001_01_000002", "host2:2222");
data = new ByteArrayOutputStream();
@ -237,12 +245,13 @@ public class TestAggregatedLogsBlock {
}
writeLog(configuration, "admin");
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
configuration, "admin", "container_0_0001_01_000001");
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(data);
HtmlBlock html = new HtmlBlockForTest();
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
TFileAggregatedLogsBlockForTest aggregatedBlock
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
"container_0_0001_01_000001", "localhost:1234");
aggregatedBlock.render(block);
block.getWriter().flush();
@ -251,7 +260,6 @@ public class TestAggregatedLogsBlock {
}
private Configuration getConfiguration() {
Configuration configuration = new YarnConfiguration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
@ -267,6 +275,28 @@ public class TestAggregatedLogsBlock {
"localhost:1234");
}
private TFileAggregatedLogsBlockForTest getTFileAggregatedLogsBlockForTest(
Configuration configuration, String user, String containerId,
String nodeName) {
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(user);
ViewContext mockContext = mock(ViewContext.class);
TFileAggregatedLogsBlockForTest aggregatedBlock
= new TFileAggregatedLogsBlockForTest(mockContext,
configuration);
aggregatedBlock.setRequest(request);
aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID,
containerId);
aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME,
nodeName);
aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user);
aggregatedBlock.moreParams().put("start", "");
aggregatedBlock.moreParams().put("end", "");
aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity");
return aggregatedBlock;
}
private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest(
Configuration configuration, String user, String containerId,
String nodeName) {
@ -340,4 +370,32 @@ public class TestAggregatedLogsBlock {
writer.close();
}
private static class TFileAggregatedLogsBlockForTest
extends TFileAggregatedLogsBlock {
private Map<String, String> params = new HashMap<String, String>();
private HttpServletRequest request;
@Inject
TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) {
super(ctx, conf);
}
public void render(Block html) {
super.render(html);
}
@Override
public Map<String, String> moreParams() {
return params;
}
public HttpServletRequest request() {
return request;
}
public void setRequest(HttpServletRequest request) {
this.request = request;
}
}
}

View File

@ -23,19 +23,27 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.junit.Test;
/**
@ -167,5 +175,34 @@ public class TestLogAggregationFileControllerFactory {
throws IOException {
// Do Nothing
}
@Override
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException {
return false;
}
@Override
public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException {
return null;
}
@Override
public void renderAggregatedLogsBlock(Block html, ViewContext context) {
// DO NOTHING
}
@Override
public String getApplicationOwner(Path aggregatedLogPath)
throws IOException {
return null;
}
@Override
public Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException {
return null;
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -53,8 +54,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
@ -91,12 +93,14 @@ public class AHSWebServices extends WebServices {
private static final Joiner JOINER = Joiner.on("");
private static final Joiner DOT_JOINER = Joiner.on(". ");
private final Configuration conf;
private final LogAggregationFileControllerFactory factory;
@Inject
public AHSWebServices(ApplicationBaseProtocol appBaseProt,
Configuration conf) {
super(appBaseProt);
this.conf = conf;
this.factory = new LogAggregationFileControllerFactory(conf);
}
@GET
@ -516,9 +520,17 @@ public class AHSWebServices extends WebServices {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
byte[] buf = new byte[65535];
boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
ContainerLogsRequest request = new ContainerLogsRequest();
request.setAppId(appId);
request.setAppOwner(appOwner);
request.setContainerId(containerIdStr);
request.setBytes(bytes);
request.setNodeId(nodeId);
Set<String> logTypes = new HashSet<>();
logTypes.add(logFile);
request.setLogTypes(logTypes);
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
.readAggregatedLogs(request, os);
if (!findLogs) {
os.write(("Can not find logs for container:"
+ containerIdStr).getBytes(Charset.forName("UTF-8")));
@ -549,9 +561,14 @@ public class AHSWebServices extends WebServices {
final String nodeId, final String containerIdStr,
boolean emptyLocalContainerLogMeta) {
try {
List<ContainerLogMeta> containerLogMeta = LogToolUtils
.getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
nodeId, appOwner);
ContainerLogsRequest request = new ContainerLogsRequest();
request.setAppId(appId);
request.setAppOwner(appOwner);
request.setContainerId(containerIdStr);
request.setNodeId(nodeId);
List<ContainerLogMeta> containerLogMeta = factory
.getFileControllerForRead(appId, appOwner)
.readAggregatedLogsMeta(request);
if (containerLogMeta.isEmpty()) {
throw new NotFoundException(
"Can not get log meta for container: " + containerIdStr);

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;

View File

@ -23,8 +23,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,8 +56,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -89,6 +93,7 @@ public class NMWebServices {
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final String redirectWSUrl;
private final LogAggregationFileControllerFactory factory;
private @javax.ws.rs.core.Context
HttpServletRequest request;
@ -107,6 +112,8 @@ public class NMWebServices {
this.webapp = webapp;
this.redirectWSUrl = this.nmContext.getConf().get(
YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL);
this.factory = new LogAggregationFileControllerFactory(
this.nmContext.getConf());
}
private void init() {
@ -253,10 +260,14 @@ public class NMWebServices {
Application app = this.nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
List<ContainerLogMeta> containerLogMeta = LogToolUtils
.getContainerLogMetaFromRemoteFS(this.nmContext.getConf(),
appId, containerIdStr,
this.nmContext.getNodeId().toString(), appOwner);
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setAppOwner(appOwner);
logRequest.setContainerId(containerIdStr);
logRequest.setNodeId(this.nmContext.getNodeId().toString());
List<ContainerLogMeta> containerLogMeta = factory
.getFileControllerForRead(appId, appOwner)
.readAggregatedLogsMeta(logRequest);
if (!containerLogMeta.isEmpty()) {
for (ContainerLogMeta logMeta : containerLogMeta) {
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
@ -441,12 +452,17 @@ public class NMWebServices {
Application app = nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
appId, appOwner, containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, bytes,
os, buf);
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setAppOwner(appOwner);
logRequest.setContainerId(containerId.toString());
logRequest.setNodeId(nmContext.getNodeId().toString());
logRequest.setBytes(bytes);
Set<String> logTypes = new HashSet<>();
logTypes.add(outputFileName);
logRequest.setLogTypes(logTypes);
factory.getFileControllerForRead(appId, appOwner)
.readAggregatedLogs(logRequest, os);
} catch (IOException ex) {
// Something wrong when we try to access the aggregated log.
if (LOG.isDebugEnabled()) {

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;

View File

@ -105,7 +105,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;