YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.
This commit is contained in:
parent
bac4e8cca8
commit
91cc070d67
|
@ -53,6 +53,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -468,7 +469,7 @@ public class TestLogsCLI {
|
|||
assertTrue(exitCode == 0);
|
||||
assertTrue(sysOutStream.toString().contains(
|
||||
logMessage(containerId1, "syslog")));
|
||||
assertTrue(sysOutStream.toString().contains("Log Upload Time"));
|
||||
assertTrue(sysOutStream.toString().contains("LogLastModifiedTime"));
|
||||
assertTrue(!sysOutStream.toString().contains(
|
||||
"Logs for container " + containerId1.toString()
|
||||
+ " are not present in this log-file."));
|
||||
|
@ -492,8 +493,12 @@ public class TestLogsCLI {
|
|||
|
||||
String logMessage = logMessage(containerId3, "stdout");
|
||||
int fileContentSize = logMessage.getBytes().length;
|
||||
int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String endOfFile = "End of LogType:stdout";
|
||||
sb.append("\n" + endOfFile + "\n");
|
||||
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
|
||||
+ "\n\n");
|
||||
int tailContentSize = sb.toString().length();
|
||||
// specify how many bytes we should get from logs
|
||||
// specify a position number, it would get the first n bytes from
|
||||
// container log
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||
|
||||
/**
|
||||
* Utils for rendering aggregated logs block.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
public final class LogAggregationWebUtils {
|
||||
|
||||
private LogAggregationWebUtils() {}
|
||||
|
||||
/**
|
||||
* Parse start index from html.
|
||||
* @param html the html
|
||||
* @param startStr the start index string
|
||||
* @return the startIndex
|
||||
*/
|
||||
public static long getLogStartIndex(Block html, String startStr)
|
||||
throws NumberFormatException {
|
||||
long start = -4096;
|
||||
|
||||
if (startStr != null && !startStr.isEmpty()) {
|
||||
start = Long.parseLong(startStr);
|
||||
}
|
||||
return start;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse end index from html.
|
||||
* @param html the html
|
||||
* @param endStr the end index string
|
||||
* @return the endIndex
|
||||
*/
|
||||
public static long getLogEndIndex(Block html, String endStr)
|
||||
throws NumberFormatException {
|
||||
long end = Long.MAX_VALUE;
|
||||
|
||||
if (endStr != null && !endStr.isEmpty()) {
|
||||
end = Long.parseLong(endStr);
|
||||
}
|
||||
return end;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify and parse containerId.
|
||||
* @param html the html
|
||||
* @param containerIdStr the containerId string
|
||||
* @return the {@link ContainerId}
|
||||
*/
|
||||
public static ContainerId verifyAndGetContainerId(Block html,
|
||||
String containerIdStr) {
|
||||
if (containerIdStr == null || containerIdStr.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without a ContainerId").__();
|
||||
return null;
|
||||
}
|
||||
ContainerId containerId = null;
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
html.h1()
|
||||
.__("Cannot get container logs for invalid containerId: "
|
||||
+ containerIdStr).__();
|
||||
return null;
|
||||
}
|
||||
return containerId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify and parse NodeId.
|
||||
* @param html the html
|
||||
* @param nodeIdStr the nodeId string
|
||||
* @return the {@link NodeId}
|
||||
*/
|
||||
public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) {
|
||||
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without a NodeId").__();
|
||||
return null;
|
||||
}
|
||||
NodeId nodeId = null;
|
||||
try {
|
||||
nodeId = NodeId.fromString(nodeIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
|
||||
.__();
|
||||
return null;
|
||||
}
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify and parse the application owner.
|
||||
* @param html the html
|
||||
* @param appOwner the Application owner
|
||||
* @return the appOwner
|
||||
*/
|
||||
public static String verifyAndGetAppOwner(Block html, String appOwner) {
|
||||
if (appOwner == null || appOwner.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without an app owner").__();
|
||||
}
|
||||
return appOwner;
|
||||
}
|
||||
}
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configurable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class LogCLIHelpers implements Configurable {
|
||||
|
@ -56,6 +53,7 @@ public class LogCLIHelpers implements Configurable {
|
|||
"Container: %s on %s";
|
||||
|
||||
private Configuration conf;
|
||||
private LogAggregationFileControllerFactory factory;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
|
@ -130,71 +128,11 @@ public class LogCLIHelpers implements Configurable {
|
|||
@VisibleForTesting
|
||||
public int dumpAContainerLogsForLogType(ContainerLogsRequest options,
|
||||
boolean outputFailure) throws IOException {
|
||||
ApplicationId applicationId = options.getAppId();
|
||||
String jobOwner = options.getAppOwner();
|
||||
String nodeId = options.getNodeId();
|
||||
String containerId = options.getContainerId();
|
||||
String localDir = options.getOutputLocalDir();
|
||||
List<String> logType = new ArrayList<String>(options.getLogTypes());
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
applicationId, jobOwner);
|
||||
if (nodeFiles == null) {
|
||||
return -1;
|
||||
}
|
||||
boolean foundContainerLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
String fileName = thisNodeFile.getPath().getName();
|
||||
if (fileName.equals(applicationId + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
PrintStream out = createPrintStream(localDir, fileName, containerId);
|
||||
try {
|
||||
reader = new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
if (getContainerLogsStream(containerId, reader) == null) {
|
||||
continue;
|
||||
}
|
||||
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
|
||||
containerId, thisNodeFile.getPath().getName());
|
||||
out.println(containerString);
|
||||
out.println("LogAggregationType: AGGREGATED");
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
// We have to re-create reader object to reset the stream index
|
||||
// after calling getContainerLogsStream which would move the stream
|
||||
// index to the end of the log file.
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
if (logType == null || logType.isEmpty()) {
|
||||
if (dumpAContainerLogs(containerId, reader, out,
|
||||
thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
} else {
|
||||
if (dumpAContainerLogsForALogType(containerId, reader, out,
|
||||
thisNodeFile.getModificationTime(), logType,
|
||||
options.getBytes()) > -1) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
closePrintStream(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundContainerLogs) {
|
||||
boolean foundAnyLogs = this.getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogs(options, null);
|
||||
if (!foundAnyLogs) {
|
||||
if (outputFailure) {
|
||||
containerLogNotFound(containerId);
|
||||
containerLogNotFound(options.getContainerId());
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
@ -204,217 +142,25 @@ public class LogCLIHelpers implements Configurable {
|
|||
@Private
|
||||
public int dumpAContainerLogsForLogTypeWithoutNodeId(
|
||||
ContainerLogsRequest options) throws IOException {
|
||||
ApplicationId applicationId = options.getAppId();
|
||||
String jobOwner = options.getAppOwner();
|
||||
String containerId = options.getContainerId();
|
||||
String localDir = options.getOutputLocalDir();
|
||||
List<String> logType = new ArrayList<String>(options.getLogTypes());
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
applicationId, jobOwner);
|
||||
if (nodeFiles == null) {
|
||||
return -1;
|
||||
}
|
||||
boolean foundContainerLogs = false;
|
||||
while(nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (!thisNodeFile.getPath().getName().endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
PrintStream out = System.out;
|
||||
try {
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
if (getContainerLogsStream(containerId, reader) == null) {
|
||||
continue;
|
||||
}
|
||||
// We have to re-create reader object to reset the stream index
|
||||
// after calling getContainerLogsStream which would move the stream
|
||||
// index to the end of the log file.
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
|
||||
containerId);
|
||||
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
|
||||
containerId, thisNodeFile.getPath().getName());
|
||||
out.println(containerString);
|
||||
out.println("LogAggregationType: AGGREGATED");
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
if (logType == null || logType.isEmpty()) {
|
||||
if (dumpAContainerLogs(containerId, reader, out,
|
||||
thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
} else {
|
||||
if (dumpAContainerLogsForALogType(containerId, reader, out,
|
||||
thisNodeFile.getModificationTime(), logType,
|
||||
options.getBytes()) > -1) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
closePrintStream(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundContainerLogs) {
|
||||
containerLogNotFound(containerId);
|
||||
boolean foundAnyLogs = getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogs(
|
||||
options, null);
|
||||
if (!foundAnyLogs) {
|
||||
containerLogNotFound(options.getContainerId());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Private
|
||||
public int dumpAContainerLogs(String containerIdStr,
|
||||
AggregatedLogFormat.LogReader reader, PrintStream out,
|
||||
long logUploadedTime, long bytes) throws IOException {
|
||||
DataInputStream valueStream = getContainerLogsStream(
|
||||
containerIdStr, reader);
|
||||
|
||||
if (valueStream == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
boolean foundContainerLogs = false;
|
||||
while (true) {
|
||||
try {
|
||||
LogReader.readAContainerLogsForALogType(valueStream, out,
|
||||
logUploadedTime, bytes);
|
||||
foundContainerLogs = true;
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (foundContainerLogs) {
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
private DataInputStream getContainerLogsStream(String containerIdStr,
|
||||
AggregatedLogFormat.LogReader reader) throws IOException {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
|
||||
while (valueStream != null && !key.toString().equals(containerIdStr)) {
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
return valueStream;
|
||||
}
|
||||
|
||||
@Private
|
||||
public int dumpAContainerLogsForALogType(String containerIdStr,
|
||||
AggregatedLogFormat.LogReader reader, PrintStream out,
|
||||
long logUploadedTime, List<String> logType, long bytes)
|
||||
throws IOException {
|
||||
DataInputStream valueStream = getContainerLogsStream(
|
||||
containerIdStr, reader);
|
||||
if (valueStream == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
boolean foundContainerLogs = false;
|
||||
while (true) {
|
||||
try {
|
||||
int result = LogReader.readContainerLogsForALogType(
|
||||
valueStream, out, logUploadedTime, logType, bytes);
|
||||
if (result == 0) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (foundContainerLogs) {
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Private
|
||||
public int dumpAllContainersLogs(ContainerLogsRequest options)
|
||||
throws IOException {
|
||||
ApplicationId appId = options.getAppId();
|
||||
String appOwner = options.getAppOwner();
|
||||
String localDir = options.getOutputLocalDir();
|
||||
List<String> logTypes = new ArrayList<String>(options.getLogTypes());
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
return -1;
|
||||
}
|
||||
boolean foundAnyLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (thisNodeFile.getPath().getName().equals(appId + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
|
||||
while (valueStream != null) {
|
||||
PrintStream out = createPrintStream(localDir,
|
||||
thisNodeFile.getPath().getName(), key.toString());
|
||||
try {
|
||||
String containerString = String.format(
|
||||
CONTAINER_ON_NODE_PATTERN, key,
|
||||
thisNodeFile.getPath().getName());
|
||||
out.println(containerString);
|
||||
out.println("LogAggregationType: AGGREGATED");
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
while (true) {
|
||||
try {
|
||||
if (logTypes == null || logTypes.isEmpty()) {
|
||||
LogReader.readAContainerLogsForALogType(valueStream, out,
|
||||
thisNodeFile.getModificationTime(),
|
||||
options.getBytes());
|
||||
foundAnyLogs = true;
|
||||
} else {
|
||||
int result = LogReader.readContainerLogsForALogType(
|
||||
valueStream, out, thisNodeFile.getModificationTime(),
|
||||
logTypes, options.getBytes());
|
||||
if (result == 0) {
|
||||
foundAnyLogs = true;
|
||||
}
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
closePrintStream(out);
|
||||
}
|
||||
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
boolean foundAnyLogs = getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogs(
|
||||
options, null);
|
||||
if (!foundAnyLogs) {
|
||||
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
|
||||
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, options.getAppId(), options.getAppOwner())
|
||||
.toString());
|
||||
return -1;
|
||||
}
|
||||
|
@ -425,14 +171,13 @@ public class LogCLIHelpers implements Configurable {
|
|||
public int printAContainerLogMetadata(ContainerLogsRequest options,
|
||||
PrintStream out, PrintStream err)
|
||||
throws IOException {
|
||||
ApplicationId appId = options.getAppId();
|
||||
String appOwner = options.getAppOwner();
|
||||
String nodeId = options.getNodeId();
|
||||
String containerIdStr = options.getContainerId();
|
||||
List<ContainerLogMeta> containersLogMeta;
|
||||
try {
|
||||
containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
|
||||
conf, appId, containerIdStr, nodeId, appOwner);
|
||||
containersLogMeta = getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogsMeta(
|
||||
options);
|
||||
} catch (Exception ex) {
|
||||
err.println(ex.getMessage());
|
||||
return -1;
|
||||
|
@ -473,8 +218,26 @@ public class LogCLIHelpers implements Configurable {
|
|||
PrintStream out, PrintStream err) throws IOException {
|
||||
ApplicationId appId = options.getAppId();
|
||||
String appOwner = options.getAppOwner();
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
appId, appOwner);
|
||||
LogAggregationFileController fileFormat = null;
|
||||
try {
|
||||
fileFormat = getFileController(appId, appOwner);
|
||||
} catch (Exception ex) {
|
||||
err.println(ex.getMessage());
|
||||
return;
|
||||
}
|
||||
RemoteIterator<FileStatus> nodeFiles = null;
|
||||
try {
|
||||
nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId,
|
||||
appOwner, fileFormat.getRemoteRootLogDir(),
|
||||
fileFormat.getRemoteRootLogDirSuffix());
|
||||
} catch (FileNotFoundException fnf) {
|
||||
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString());
|
||||
} catch (AccessControlException | AccessDeniedException ace) {
|
||||
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString(), appOwner,
|
||||
ace.getMessage());
|
||||
}
|
||||
if (nodeFiles == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -497,44 +260,21 @@ public class LogCLIHelpers implements Configurable {
|
|||
public void printContainersList(ContainerLogsRequest options,
|
||||
PrintStream out, PrintStream err) throws IOException {
|
||||
ApplicationId appId = options.getAppId();
|
||||
String appOwner = options.getAppOwner();
|
||||
String nodeId = options.getNodeId();
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
return;
|
||||
}
|
||||
boolean foundAnyLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
out.println(String.format(CONTAINER_ON_NODE_PATTERN, key,
|
||||
thisNodeFile.getPath().getName()));
|
||||
foundAnyLogs = true;
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
|
||||
try {
|
||||
containersLogMeta = getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogsMeta(
|
||||
options);
|
||||
} catch (Exception ex) {
|
||||
err.println(ex.getMessage());
|
||||
}
|
||||
for(ContainerLogMeta logMeta : containersLogMeta) {
|
||||
out.println(String.format(CONTAINER_ON_NODE_PATTERN,
|
||||
logMeta.getContainerId(),
|
||||
logMeta.getNodeId()));
|
||||
foundAnyLogs = true;
|
||||
}
|
||||
if (!foundAnyLogs) {
|
||||
if (nodeId != null) {
|
||||
|
@ -547,26 +287,6 @@ public class LogCLIHelpers implements Configurable {
|
|||
}
|
||||
}
|
||||
|
||||
private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
|
||||
String appOwner) throws IOException {
|
||||
RemoteIterator<FileStatus> nodeFiles = null;
|
||||
try {
|
||||
nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
|
||||
conf, appId, appOwner);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString());
|
||||
} catch (AccessControlException | AccessDeniedException ace) {
|
||||
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString(), appOwner,
|
||||
ace.getMessage());
|
||||
} catch (IOException ioe) {
|
||||
logDirIOError(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString(), ioe.getMessage());
|
||||
}
|
||||
return nodeFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
@ -600,11 +320,6 @@ public class LogCLIHelpers implements Configurable {
|
|||
+ ". Error message found: " + errorMessage);
|
||||
}
|
||||
|
||||
private static void logDirIOError(String remoteAppLogDir, String errMsg) {
|
||||
System.err.println("Cannot access to " + remoteAppLogDir +
|
||||
". Error message found: " + errMsg);
|
||||
}
|
||||
|
||||
@Private
|
||||
public PrintStream createPrintStream(String localDir, String nodeId,
|
||||
String containerId) throws IOException {
|
||||
|
@ -628,59 +343,29 @@ public class LogCLIHelpers implements Configurable {
|
|||
@Private
|
||||
public Set<String> listContainerLogs(ContainerLogsRequest options)
|
||||
throws IOException {
|
||||
List<ContainerLogMeta> containersLogMeta;
|
||||
Set<String> logTypes = new HashSet<String>();
|
||||
ApplicationId appId = options.getAppId();
|
||||
String appOwner = options.getAppOwner();
|
||||
String nodeId = options.getNodeId();
|
||||
String containerIdStr = options.getContainerId();
|
||||
boolean getAllContainers = (containerIdStr == null);
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
try {
|
||||
containersLogMeta = getFileController(options.getAppId(),
|
||||
options.getAppOwner()).readAggregatedLogsMeta(
|
||||
options);
|
||||
} catch (Exception ex) {
|
||||
System.err.println(ex.getMessage());
|
||||
return logTypes;
|
||||
}
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
while (true) {
|
||||
try {
|
||||
String logFile = LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream).getFirst();
|
||||
logTypes.add(logFile);
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
for (ContainerLogMeta logMeta: containersLogMeta) {
|
||||
for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
|
||||
logTypes.add(fileInfo.getFileName());
|
||||
}
|
||||
}
|
||||
return logTypes;
|
||||
}
|
||||
|
||||
private LogAggregationFileController getFileController(ApplicationId appId,
|
||||
String appOwner) throws IOException {
|
||||
if (factory == null) {
|
||||
factory = new LogAggregationFileControllerFactory(conf);
|
||||
}
|
||||
return factory.getFileControllerForRead(appId, appOwner);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -27,19 +25,7 @@ import java.nio.channels.Channels;
|
|||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
/**
|
||||
* This class contains several utility function which could be used in different
|
||||
|
@ -53,81 +39,6 @@ public final class LogToolUtils {
|
|||
public static final String CONTAINER_ON_NODE_PATTERN =
|
||||
"Container: %s on %s";
|
||||
|
||||
/**
|
||||
* Return a list of {@link ContainerLogMeta} for a container
|
||||
* from Remote FileSystem.
|
||||
*
|
||||
* @param conf the configuration
|
||||
* @param appId the applicationId
|
||||
* @param containerIdStr the containerId
|
||||
* @param nodeId the nodeId
|
||||
* @param appOwner the application owner
|
||||
* @return a list of {@link ContainerLogMeta}
|
||||
* @throws IOException if there is no available log file
|
||||
*/
|
||||
public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
|
||||
Configuration conf, ApplicationId appId, String containerIdStr,
|
||||
String nodeId, String appOwner) throws IOException {
|
||||
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
|
||||
boolean getAllContainers = (containerIdStr == null);
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
throw new IOException("There is no available log fils for "
|
||||
+ "application:" + appId);
|
||||
}
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
|
||||
key.toString(), thisNodeFile.getPath().getName());
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream);
|
||||
containerLogMeta.addLogMeta(
|
||||
logMeta.getFirst(),
|
||||
logMeta.getSecond(),
|
||||
Times.format(thisNodeFile.getModificationTime()));
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
containersLogMeta.add(containerLogMeta);
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
return containersLogMeta;
|
||||
}
|
||||
|
||||
/**
|
||||
* Output container log.
|
||||
* @param containerId the containerId
|
||||
|
@ -247,82 +158,4 @@ public final class LogToolUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static boolean outputAggregatedContainerLog(Configuration conf,
|
||||
ApplicationId appId, String appOwner,
|
||||
String containerId, String nodeId,
|
||||
String logFileName, long outputSize, OutputStream os,
|
||||
byte[] buf) throws IOException {
|
||||
boolean findLogs = false;
|
||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, appId, appOwner);
|
||||
while (nodeFiles != null && nodeFiles.hasNext()) {
|
||||
final FileStatus thisNodeFile = nodeFiles.next();
|
||||
String nodeName = thisNodeFile.getPath().getName();
|
||||
if (nodeName.equals(appId + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if ((nodeId == null || nodeName.contains(LogAggregationUtils
|
||||
.getNodeString(nodeId))) && !nodeName.endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
reader = new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null && !key.toString()
|
||||
.equals(containerId)) {
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
if (valueStream == null) {
|
||||
continue;
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
String fileType = valueStream.readUTF();
|
||||
String fileLengthStr = valueStream.readUTF();
|
||||
long fileLength = Long.parseLong(fileLengthStr);
|
||||
if (fileType.equalsIgnoreCase(logFileName)) {
|
||||
LogToolUtils.outputContainerLog(containerId,
|
||||
nodeId, fileType, fileLength, outputSize,
|
||||
Times.format(thisNodeFile.getModificationTime()),
|
||||
valueStream, os, buf,
|
||||
ContainerLogAggregationType.AGGREGATED);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String endOfFile = "End of LogType:" + fileType;
|
||||
sb.append("\n" + endOfFile + "\n");
|
||||
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
|
||||
+ "\n\n");
|
||||
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
||||
os.write(b, 0, b.length);
|
||||
findLogs = true;
|
||||
} else {
|
||||
long totalSkipped = 0;
|
||||
long currSkipped = 0;
|
||||
while (currSkipped != -1 && totalSkipped < fileLength) {
|
||||
currSkipped = valueStream.skip(
|
||||
fileLength - totalSkipped);
|
||||
totalSkipped += currSkipped;
|
||||
}
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
os.flush();
|
||||
return findLogs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,10 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Sets;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -31,7 +35,9 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -42,13 +48,18 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
|
||||
/**
|
||||
* Base class to implement Log Aggregation File Controller.
|
||||
|
@ -167,6 +178,74 @@ public abstract class LogAggregationFileController {
|
|||
public abstract void postWrite(LogAggregationFileControllerContext record)
|
||||
throws Exception;
|
||||
|
||||
protected PrintStream createPrintStream(String localDir, String nodeId,
|
||||
String containerId) throws IOException {
|
||||
PrintStream out = System.out;
|
||||
if(localDir != null && !localDir.isEmpty()) {
|
||||
Path nodePath = new Path(localDir, LogAggregationUtils
|
||||
.getNodeString(nodeId));
|
||||
Files.createDirectories(Paths.get(nodePath.toString()));
|
||||
Path containerLogPath = new Path(nodePath, containerId);
|
||||
out = new PrintStream(containerLogPath.toString(), "UTF-8");
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
protected void closePrintStream(OutputStream out) {
|
||||
if (out != System.out) {
|
||||
IOUtils.closeQuietly(out);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Output container log.
|
||||
* @param logRequest {@link ContainerLogsRequest}
|
||||
* @param os the output stream
|
||||
* @throws IOException if we can not access the log file.
|
||||
*/
|
||||
public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|
||||
OutputStream os) throws IOException;
|
||||
|
||||
/**
|
||||
* Return a list of {@link ContainerLogMeta} for an application
|
||||
* from Remote FileSystem.
|
||||
*
|
||||
* @param logRequest {@link ContainerLogsRequest}
|
||||
* @return a list of {@link ContainerLogMeta}
|
||||
* @throws IOException if there is no available log file
|
||||
*/
|
||||
public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException;
|
||||
|
||||
/**
|
||||
* Render Aggregated Logs block.
|
||||
* @param html the html
|
||||
* @param context the ViewContext
|
||||
*/
|
||||
public abstract void renderAggregatedLogsBlock(Block html,
|
||||
ViewContext context);
|
||||
|
||||
/**
|
||||
* Returns the owner of the application.
|
||||
*
|
||||
* @param the aggregatedLog path.
|
||||
* @return the application owner.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract String getApplicationOwner(Path aggregatedLogPath)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns ACLs for the application. An empty map is returned if no ACLs are
|
||||
* found.
|
||||
*
|
||||
* @param the aggregatedLog path.
|
||||
* @return a map of the Application ACLs.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
|
||||
Path aggregatedLogPath) throws IOException;
|
||||
|
||||
/**
|
||||
* Verify and create the remote log directory.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
/**
|
||||
* Base class to implement Aggregated Logs Block.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
|
||||
public abstract class LogAggregationHtmlBlock extends HtmlBlock {
|
||||
|
||||
@Inject
|
||||
public LogAggregationHtmlBlock(ViewContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
protected BlockParameters verifyAndParseParameters(Block html) {
|
||||
BlockParameters params = new BlockParameters();
|
||||
ContainerId containerId = LogAggregationWebUtils
|
||||
.verifyAndGetContainerId(html, $(CONTAINER_ID));
|
||||
params.setContainerId(containerId);
|
||||
|
||||
NodeId nodeId = LogAggregationWebUtils
|
||||
.verifyAndGetNodeId(html, $(NM_NODENAME));
|
||||
params.setNodeId(nodeId);
|
||||
|
||||
String appOwner = LogAggregationWebUtils
|
||||
.verifyAndGetAppOwner(html, $(APP_OWNER));
|
||||
params.setAppOwner(appOwner);
|
||||
|
||||
boolean isValid = true;
|
||||
long start = -4096;
|
||||
try {
|
||||
start = LogAggregationWebUtils.getLogStartIndex(
|
||||
html, $("start"));
|
||||
} catch (NumberFormatException ne) {
|
||||
html.h1().__("Invalid log start value: " + $("start")).__();
|
||||
isValid = false;
|
||||
}
|
||||
params.setStartIndex(start);
|
||||
|
||||
long end = Long.MAX_VALUE;
|
||||
try {
|
||||
end = LogAggregationWebUtils.getLogEndIndex(
|
||||
html, $("end"));
|
||||
} catch (NumberFormatException ne) {
|
||||
html.h1().__("Invalid log start value: " + $("end")).__();
|
||||
isValid = false;
|
||||
}
|
||||
params.setEndIndex(end);
|
||||
|
||||
if (containerId == null || nodeId == null || appOwner == null
|
||||
|| appOwner.isEmpty() || !isValid) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
params.setAppId(appId);
|
||||
|
||||
String logEntity = $(ENTITY_STRING);
|
||||
if (logEntity == null || logEntity.isEmpty()) {
|
||||
logEntity = containerId.toString();
|
||||
}
|
||||
params.setLogEntity(logEntity);
|
||||
|
||||
return params;
|
||||
}
|
||||
|
||||
protected boolean checkAcls(Configuration conf, ApplicationId appId,
|
||||
String owner, Map<ApplicationAccessType, String> appAcls,
|
||||
String remoteUser) {
|
||||
ApplicationACLsManager aclsManager = new ApplicationACLsManager(
|
||||
conf);
|
||||
aclsManager.addApplication(appId, appAcls);
|
||||
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, owner, appId)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected static class BlockParameters {
|
||||
private ApplicationId appId;
|
||||
private ContainerId containerId;
|
||||
private NodeId nodeId;
|
||||
private String appOwner;
|
||||
private long start;
|
||||
private long end;
|
||||
private String logEntity;
|
||||
|
||||
public ApplicationId getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(ApplicationId appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public void setContainerId(ContainerId containerId) {
|
||||
this.containerId = containerId;
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public void setNodeId(NodeId nodeId) {
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
public String getAppOwner() {
|
||||
return appOwner;
|
||||
}
|
||||
|
||||
public void setAppOwner(String appOwner) {
|
||||
this.appOwner = appOwner;
|
||||
}
|
||||
|
||||
public long getStartIndex() {
|
||||
return start;
|
||||
}
|
||||
|
||||
public void setStartIndex(long startIndex) {
|
||||
this.start = startIndex;
|
||||
}
|
||||
|
||||
public long getEndIndex() {
|
||||
return end;
|
||||
}
|
||||
|
||||
public void setEndIndex(long endIndex) {
|
||||
this.end = endIndex;
|
||||
}
|
||||
|
||||
public String getLogEntity() {
|
||||
return logEntity;
|
||||
}
|
||||
|
||||
public void setLogEntity(String logEntity) {
|
||||
this.logEntity = logEntity;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,127 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
/**
|
||||
* The TFile log aggregation file Controller implementation.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class LogAggregationTFileController
|
||||
extends LogAggregationFileController {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
LogAggregationTFileController.class);
|
||||
|
||||
private LogWriter writer;
|
||||
|
||||
public LogAggregationTFileController(){}
|
||||
|
||||
@Override
|
||||
public void initInternal(Configuration conf) {
|
||||
this.remoteRootLogDir = new Path(
|
||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
this.remoteRootLogDirSuffix =
|
||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeWriter(LogAggregationFileControllerContext context)
|
||||
throws IOException {
|
||||
this.writer = new LogWriter();
|
||||
writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
|
||||
context.getUserUgi());
|
||||
// Write ACLs once when the writer is created.
|
||||
writer.writeApplicationACLs(context.getAppAcls());
|
||||
writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeWriter() {
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(LogKey logKey, LogValue logValue) throws IOException {
|
||||
this.writer.append(logKey, logValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postWrite(final LogAggregationFileControllerContext record)
|
||||
throws Exception {
|
||||
// Before upload logs, make sure the number of existing logs
|
||||
// is smaller than the configured NM log aggregation retention size.
|
||||
if (record.isUploadedLogsInThisCycle() &&
|
||||
record.isLogAggregationInRolling()) {
|
||||
cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
|
||||
record.getUserUgi());
|
||||
record.increcleanupOldLogTimes();
|
||||
}
|
||||
|
||||
final Path renamedPath = record.getRollingMonitorInterval() <= 0
|
||||
? record.getRemoteNodeLogFileForApp() : new Path(
|
||||
record.getRemoteNodeLogFileForApp().getParent(),
|
||||
record.getRemoteNodeLogFileForApp().getName() + "_"
|
||||
+ record.getLogUploadTimeStamp());
|
||||
final boolean rename = record.isUploadedLogsInThisCycle();
|
||||
try {
|
||||
record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
|
||||
.getFileSystem(conf);
|
||||
if (rename) {
|
||||
remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
|
||||
renamedPath);
|
||||
} else {
|
||||
remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
"Failed to move temporary log file to final location: ["
|
||||
+ record.getRemoteNodeTmpLogFileForApp() + "] to ["
|
||||
+ renamedPath + "]", e);
|
||||
throw new Exception("Log uploaded failed for Application: "
|
||||
+ record.getAppId() + " in NodeManager: "
|
||||
+ LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
|
||||
+ Times.format(record.getLogUploadTimeStamp()) + "\n");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,375 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||
|
||||
/**
|
||||
* The TFile log aggregation file Controller implementation.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class LogAggregationTFileController
|
||||
extends LogAggregationFileController {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
LogAggregationTFileController.class);
|
||||
|
||||
private LogWriter writer;
|
||||
private TFileLogReader tfReader = null;
|
||||
|
||||
public LogAggregationTFileController(){}
|
||||
|
||||
@Override
|
||||
public void initInternal(Configuration conf) {
|
||||
this.remoteRootLogDir = new Path(
|
||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
this.remoteRootLogDirSuffix =
|
||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeWriter(LogAggregationFileControllerContext context)
|
||||
throws IOException {
|
||||
this.writer = new LogWriter();
|
||||
writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
|
||||
context.getUserUgi());
|
||||
// Write ACLs once when the writer is created.
|
||||
writer.writeApplicationACLs(context.getAppAcls());
|
||||
writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeWriter() {
|
||||
this.writer.close();
|
||||
this.writer = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(LogKey logKey, LogValue logValue) throws IOException {
|
||||
this.writer.append(logKey, logValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postWrite(final LogAggregationFileControllerContext record)
|
||||
throws Exception {
|
||||
// Before upload logs, make sure the number of existing logs
|
||||
// is smaller than the configured NM log aggregation retention size.
|
||||
if (record.isUploadedLogsInThisCycle() &&
|
||||
record.isLogAggregationInRolling()) {
|
||||
cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
|
||||
record.getUserUgi());
|
||||
record.increcleanupOldLogTimes();
|
||||
}
|
||||
|
||||
final Path renamedPath = record.getRollingMonitorInterval() <= 0
|
||||
? record.getRemoteNodeLogFileForApp() : new Path(
|
||||
record.getRemoteNodeLogFileForApp().getParent(),
|
||||
record.getRemoteNodeLogFileForApp().getName() + "_"
|
||||
+ record.getLogUploadTimeStamp());
|
||||
final boolean rename = record.isUploadedLogsInThisCycle();
|
||||
try {
|
||||
record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
|
||||
.getFileSystem(conf);
|
||||
if (rename) {
|
||||
remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
|
||||
renamedPath);
|
||||
} else {
|
||||
remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
"Failed to move temporary log file to final location: ["
|
||||
+ record.getRemoteNodeTmpLogFileForApp() + "] to ["
|
||||
+ renamedPath + "]", e);
|
||||
throw new Exception("Log uploaded failed for Application: "
|
||||
+ record.getAppId() + " in NodeManager: "
|
||||
+ LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
|
||||
+ Times.format(record.getLogUploadTimeStamp()) + "\n");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|
||||
OutputStream os) throws IOException {
|
||||
boolean findLogs = false;
|
||||
boolean createPrintStream = (os == null);
|
||||
ApplicationId appId = logRequest.getAppId();
|
||||
String nodeId = logRequest.getNodeId();
|
||||
List<String> logTypes = new ArrayList<>();
|
||||
if (logRequest.getLogTypes() != null && !logRequest
|
||||
.getLogTypes().isEmpty()) {
|
||||
logTypes.addAll(logRequest.getLogTypes());
|
||||
}
|
||||
String containerIdStr = logRequest.getContainerId();
|
||||
boolean getAllContainers = (containerIdStr == null
|
||||
|| containerIdStr.isEmpty());
|
||||
long size = logRequest.getBytes();
|
||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner());
|
||||
byte[] buf = new byte[65535];
|
||||
while (nodeFiles != null && nodeFiles.hasNext()) {
|
||||
final FileStatus thisNodeFile = nodeFiles.next();
|
||||
LOG.error(thisNodeFile.getPath().toString());
|
||||
String nodeName = thisNodeFile.getPath().getName();
|
||||
if (nodeName.equals(appId + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if ((nodeId == null || nodeName.contains(LogAggregationUtils
|
||||
.getNodeString(nodeId))) && !nodeName.endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
reader = new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
if (createPrintStream) {
|
||||
os = createPrintStream(
|
||||
logRequest.getOutputLocalDir(),
|
||||
thisNodeFile.getPath().getName(), key.toString());
|
||||
}
|
||||
try {
|
||||
while (true) {
|
||||
try {
|
||||
String fileType = valueStream.readUTF();
|
||||
String fileLengthStr = valueStream.readUTF();
|
||||
long fileLength = Long.parseLong(fileLengthStr);
|
||||
if (logTypes == null || logTypes.isEmpty() ||
|
||||
logTypes.contains(fileType)) {
|
||||
LogToolUtils.outputContainerLog(key.toString(),
|
||||
nodeName, fileType, fileLength, size,
|
||||
Times.format(thisNodeFile.getModificationTime()),
|
||||
valueStream, os, buf,
|
||||
ContainerLogAggregationType.AGGREGATED);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String endOfFile = "End of LogType:" + fileType;
|
||||
sb.append("\n" + endOfFile + "\n");
|
||||
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
|
||||
+ "\n\n");
|
||||
byte[] b = sb.toString().getBytes(
|
||||
Charset.forName("UTF-8"));
|
||||
os.write(b, 0, b.length);
|
||||
findLogs = true;
|
||||
} else {
|
||||
long totalSkipped = 0;
|
||||
long currSkipped = 0;
|
||||
while (currSkipped != -1 && totalSkipped < fileLength) {
|
||||
currSkipped = valueStream.skip(
|
||||
fileLength - totalSkipped);
|
||||
totalSkipped += currSkipped;
|
||||
}
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
os.flush();
|
||||
if (createPrintStream) {
|
||||
closePrintStream(os);
|
||||
}
|
||||
}
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return findLogs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
|
||||
String containerIdStr = logRequest.getContainerId();
|
||||
String nodeId = logRequest.getNodeId();
|
||||
ApplicationId appId = logRequest.getAppId();
|
||||
String appOwner = logRequest.getAppOwner();
|
||||
boolean getAllContainers = (containerIdStr == null);
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
throw new IOException("There is no available log fils for "
|
||||
+ "application:" + appId);
|
||||
}
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
|
||||
key.toString(), thisNodeFile.getPath().getName());
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream);
|
||||
containerLogMeta.addLogMeta(
|
||||
logMeta.getFirst(),
|
||||
logMeta.getSecond(),
|
||||
Times.format(thisNodeFile.getModificationTime()));
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
containersLogMeta.add(containerLogMeta);
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
return containersLogMeta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renderAggregatedLogsBlock(Block html, ViewContext context) {
|
||||
TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(
|
||||
context, conf);
|
||||
block.render(html);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationOwner(Path aggregatedLog) throws IOException {
|
||||
createTFileLogReader(aggregatedLog);
|
||||
return this.tfReader.getLogReader().getApplicationOwner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationAcls(
|
||||
Path aggregatedLog) throws IOException {
|
||||
createTFileLogReader(aggregatedLog);
|
||||
return this.tfReader.getLogReader().getApplicationAcls();
|
||||
}
|
||||
|
||||
private void createTFileLogReader(Path aggregatedLog) throws IOException {
|
||||
if (this.tfReader == null || !this.tfReader.getAggregatedLogPath()
|
||||
.equals(aggregatedLog)) {
|
||||
LogReader logReader = new LogReader(conf, aggregatedLog);
|
||||
this.tfReader = new TFileLogReader(logReader, aggregatedLog);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TFileLogReader {
|
||||
private LogReader logReader;
|
||||
private Path aggregatedLogPath;
|
||||
|
||||
TFileLogReader(LogReader logReader, Path aggregatedLogPath) {
|
||||
this.setLogReader(logReader);
|
||||
this.setAggregatedLogPath(aggregatedLogPath);
|
||||
}
|
||||
public LogReader getLogReader() {
|
||||
return logReader;
|
||||
}
|
||||
public void setLogReader(LogReader logReader) {
|
||||
this.logReader = logReader;
|
||||
}
|
||||
public Path getAggregatedLogPath() {
|
||||
return aggregatedLogPath;
|
||||
}
|
||||
public void setAggregatedLogPath(Path aggregatedLogPath) {
|
||||
this.aggregatedLogPath = aggregatedLogPath;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
|
||||
|
||||
/**
|
||||
* The Aggregated Logs Block implementation for TFile.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
|
||||
public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
@Inject
|
||||
public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) {
|
||||
super(ctx);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
|
||||
BlockParameters params = verifyAndParseParameters(html);
|
||||
if (params == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteIterator<FileStatus> nodeFiles;
|
||||
try {
|
||||
nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, params.getAppId(),
|
||||
params.getAppOwner());
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception ex) {
|
||||
html.h1("No logs available for container "
|
||||
+ params.getContainerId().toString());
|
||||
return;
|
||||
}
|
||||
|
||||
NodeId nodeId = params.getNodeId();
|
||||
String logEntity = params.getLogEntity();
|
||||
ApplicationId appId = params.getAppId();
|
||||
ContainerId containerId = params.getContainerId();
|
||||
long start = params.getStartIndex();
|
||||
long end = params.getEndIndex();
|
||||
|
||||
boolean foundLog = false;
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
while (nodeFiles.hasNext()) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (thisNodeFile.getPath().getName().equals(
|
||||
params.getAppId() + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
|| thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
continue;
|
||||
}
|
||||
long logUploadedTime = thisNodeFile.getModificationTime();
|
||||
reader = new AggregatedLogFormat.LogReader(
|
||||
conf, thisNodeFile.getPath());
|
||||
|
||||
String owner = null;
|
||||
Map<ApplicationAccessType, String> appAcls = null;
|
||||
try {
|
||||
owner = reader.getApplicationOwner();
|
||||
appAcls = reader.getApplicationAcls();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
continue;
|
||||
}
|
||||
String remoteUser = request().getRemoteUser();
|
||||
|
||||
if (!checkAcls(conf, appId, owner, appAcls, remoteUser)) {
|
||||
html.h1().__("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity
|
||||
+ " in log file [" + thisNodeFile.getPath().getName() + "]")
|
||||
.__();
|
||||
LOG.error("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity);
|
||||
continue;
|
||||
}
|
||||
|
||||
AggregatedLogFormat.ContainerLogsReader logReader = reader
|
||||
.getContainerLogsReader(containerId);
|
||||
if (logReader == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foundLog = readContainerLogs(html, logReader, start, end,
|
||||
desiredLogType, logUploadedTime);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
continue;
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container "
|
||||
+ containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
html.h1().__("Error getting logs for " + logEntity).__();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean readContainerLogs(Block html,
|
||||
AggregatedLogFormat.ContainerLogsReader logReader, long startIndex,
|
||||
long endIndex, String desiredLogType, long logUpLoadTime)
|
||||
throws IOException {
|
||||
int bufferSize = 65536;
|
||||
char[] cbuf = new char[bufferSize];
|
||||
|
||||
boolean foundLog = false;
|
||||
String logType = logReader.nextLog();
|
||||
while (logType != null) {
|
||||
if (desiredLogType == null || desiredLogType.isEmpty()
|
||||
|| desiredLogType.equals(logType)) {
|
||||
long logLength = logReader.getCurrentLogLength();
|
||||
if (foundLog) {
|
||||
html.pre().__("\n\n").__();
|
||||
}
|
||||
|
||||
html.p().__("Log Type: " + logType).__();
|
||||
html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
|
||||
html.p().__("Log Length: " + Long.toString(logLength)).__();
|
||||
|
||||
long start = startIndex < 0
|
||||
? logLength + startIndex : startIndex;
|
||||
start = start < 0 ? 0 : start;
|
||||
start = start > logLength ? logLength : start;
|
||||
long end = endIndex < 0
|
||||
? logLength + endIndex : endIndex;
|
||||
end = end < 0 ? 0 : end;
|
||||
end = end > logLength ? logLength : end;
|
||||
end = end < start ? start : end;
|
||||
|
||||
long toRead = end - start;
|
||||
if (toRead < logLength) {
|
||||
html.p().__("Showing " + toRead + " bytes of " + logLength
|
||||
+ " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
|
||||
$(ENTITY_STRING), $(APP_OWNER),
|
||||
logType, "?start=0"), "here").
|
||||
__(" for the full log.").__();
|
||||
}
|
||||
|
||||
long totalSkipped = 0;
|
||||
while (totalSkipped < start) {
|
||||
long ret = logReader.skip(start - totalSkipped);
|
||||
if (ret == 0) {
|
||||
//Read one byte
|
||||
int nextByte = logReader.read();
|
||||
// Check if we have reached EOF
|
||||
if (nextByte == -1) {
|
||||
throw new IOException("Premature EOF from container log");
|
||||
}
|
||||
ret = 1;
|
||||
}
|
||||
totalSkipped += ret;
|
||||
}
|
||||
|
||||
int len = 0;
|
||||
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
PRE<Hamlet> pre = html.pre();
|
||||
|
||||
while (toRead > 0
|
||||
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
|
||||
pre.__(new String(cbuf, 0, len));
|
||||
toRead = toRead - len;
|
||||
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
}
|
||||
|
||||
pre.__();
|
||||
foundLog = true;
|
||||
}
|
||||
|
||||
logType = logReader.nextLog();
|
||||
}
|
||||
|
||||
return foundLog;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@ -20,55 +20,59 @@ package org.apache.hadoop.yarn.webapp.log;
|
|||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
|
||||
public class AggregatedLogsBlock extends HtmlBlock {
|
||||
|
||||
private final Configuration conf;
|
||||
private final LogAggregationFileControllerFactory factory;
|
||||
|
||||
@Inject
|
||||
AggregatedLogsBlock(Configuration conf) {
|
||||
this.conf = conf;
|
||||
factory = new LogAggregationFileControllerFactory(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
ContainerId containerId = verifyAndGetContainerId(html);
|
||||
NodeId nodeId = verifyAndGetNodeId(html);
|
||||
String appOwner = verifyAndGetAppOwner(html);
|
||||
LogLimits logLimits = verifyAndGetLogLimits(html);
|
||||
ContainerId containerId = LogAggregationWebUtils
|
||||
.verifyAndGetContainerId(html, $(CONTAINER_ID));
|
||||
NodeId nodeId = LogAggregationWebUtils
|
||||
.verifyAndGetNodeId(html, $(NM_NODENAME));
|
||||
String appOwner = LogAggregationWebUtils
|
||||
.verifyAndGetAppOwner(html, $(APP_OWNER));
|
||||
boolean isValid = true;
|
||||
try {
|
||||
LogAggregationWebUtils.getLogStartIndex(
|
||||
html, $("start"));
|
||||
} catch (NumberFormatException ne) {
|
||||
html.h1().__("Invalid log start value: " + $("start")).__();
|
||||
isValid = false;
|
||||
}
|
||||
try {
|
||||
LogAggregationWebUtils.getLogEndIndex(
|
||||
html, $("end"));
|
||||
} catch (NumberFormatException ne) {
|
||||
html.h1().__("Invalid log start value: " + $("end")).__();
|
||||
isValid = false;
|
||||
}
|
||||
|
||||
if (containerId == null || nodeId == null || appOwner == null
|
||||
|| appOwner.isEmpty() || logLimits == null) {
|
||||
|| appOwner.isEmpty() || !isValid) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -93,21 +97,11 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
return;
|
||||
}
|
||||
|
||||
Path remoteRootLogDir = new Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, applicationId, appOwner,
|
||||
LogAggregationUtils.getRemoteNodeLogDirSuffix(conf));
|
||||
RemoteIterator<FileStatus> nodeFiles;
|
||||
LogAggregationFileController fileController;
|
||||
try {
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(
|
||||
remoteAppDir);
|
||||
nodeFiles =
|
||||
FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
|
||||
.listStatus(remoteAppDir);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
fileController = this.factory.getFileControllerForRead(
|
||||
applicationId, appOwner);
|
||||
} catch (Exception fnf) {
|
||||
html.h1()
|
||||
.__("Logs not available for " + logEntity
|
||||
+ ". Aggregation may not be complete, "
|
||||
|
@ -118,251 +112,9 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
.__();
|
||||
}
|
||||
return;
|
||||
} catch (Exception ex) {
|
||||
html.h1()
|
||||
.__("Error getting logs at " + nodeId).__();
|
||||
return;
|
||||
}
|
||||
|
||||
boolean foundLog = false;
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
while (nodeFiles.hasNext()) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
|| thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
continue;
|
||||
}
|
||||
long logUploadedTime = thisNodeFile.getModificationTime();
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
|
||||
|
||||
String owner = null;
|
||||
Map<ApplicationAccessType, String> appAcls = null;
|
||||
try {
|
||||
owner = reader.getApplicationOwner();
|
||||
appAcls = reader.getApplicationAcls();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
continue;
|
||||
}
|
||||
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
|
||||
aclsManager.addApplication(applicationId, appAcls);
|
||||
|
||||
String remoteUser = request().getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, owner, applicationId)) {
|
||||
html.h1()
|
||||
.__("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity
|
||||
+ " in log file [" + thisNodeFile.getPath().getName() + "]").__();
|
||||
LOG.error("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity);
|
||||
continue;
|
||||
}
|
||||
|
||||
AggregatedLogFormat.ContainerLogsReader logReader = reader
|
||||
.getContainerLogsReader(containerId);
|
||||
if (logReader == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foundLog = readContainerLogs(html, logReader, logLimits,
|
||||
desiredLogType, logUploadedTime);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
continue;
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container " + containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
html.h1().__("Error getting logs for " + logEntity).__();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean readContainerLogs(Block html,
|
||||
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
|
||||
String desiredLogType, long logUpLoadTime) throws IOException {
|
||||
int bufferSize = 65536;
|
||||
char[] cbuf = new char[bufferSize];
|
||||
|
||||
boolean foundLog = false;
|
||||
String logType = logReader.nextLog();
|
||||
while (logType != null) {
|
||||
if (desiredLogType == null || desiredLogType.isEmpty()
|
||||
|| desiredLogType.equals(logType)) {
|
||||
long logLength = logReader.getCurrentLogLength();
|
||||
if (foundLog) {
|
||||
html.pre().__("\n\n").__();
|
||||
}
|
||||
|
||||
html.p().__("Log Type: " + logType).__();
|
||||
html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
|
||||
html.p().__("Log Length: " + Long.toString(logLength)).__();
|
||||
|
||||
long start = logLimits.start < 0
|
||||
? logLength + logLimits.start : logLimits.start;
|
||||
start = start < 0 ? 0 : start;
|
||||
start = start > logLength ? logLength : start;
|
||||
long end = logLimits.end < 0
|
||||
? logLength + logLimits.end : logLimits.end;
|
||||
end = end < 0 ? 0 : end;
|
||||
end = end > logLength ? logLength : end;
|
||||
end = end < start ? start : end;
|
||||
|
||||
long toRead = end - start;
|
||||
if (toRead < logLength) {
|
||||
html.p().__("Showing " + toRead + " bytes of " + logLength
|
||||
+ " total. Click ")
|
||||
.a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
|
||||
$(ENTITY_STRING), $(APP_OWNER),
|
||||
logType, "?start=0"), "here").
|
||||
__(" for the full log.").__();
|
||||
}
|
||||
|
||||
long totalSkipped = 0;
|
||||
while (totalSkipped < start) {
|
||||
long ret = logReader.skip(start - totalSkipped);
|
||||
if (ret == 0) {
|
||||
//Read one byte
|
||||
int nextByte = logReader.read();
|
||||
// Check if we have reached EOF
|
||||
if (nextByte == -1) {
|
||||
throw new IOException( "Premature EOF from container log");
|
||||
}
|
||||
ret = 1;
|
||||
}
|
||||
totalSkipped += ret;
|
||||
}
|
||||
|
||||
int len = 0;
|
||||
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
PRE<Hamlet> pre = html.pre();
|
||||
|
||||
while (toRead > 0
|
||||
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
|
||||
pre.__(new String(cbuf, 0, len));
|
||||
toRead = toRead - len;
|
||||
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
}
|
||||
|
||||
pre.__();
|
||||
foundLog = true;
|
||||
}
|
||||
|
||||
logType = logReader.nextLog();
|
||||
}
|
||||
|
||||
return foundLog;
|
||||
}
|
||||
|
||||
private ContainerId verifyAndGetContainerId(Block html) {
|
||||
String containerIdStr = $(CONTAINER_ID);
|
||||
if (containerIdStr == null || containerIdStr.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without a ContainerId").__();
|
||||
return null;
|
||||
}
|
||||
ContainerId containerId = null;
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
html.h1()
|
||||
.__("Cannot get container logs for invalid containerId: "
|
||||
+ containerIdStr).__();
|
||||
return null;
|
||||
}
|
||||
return containerId;
|
||||
}
|
||||
|
||||
private NodeId verifyAndGetNodeId(Block html) {
|
||||
String nodeIdStr = $(NM_NODENAME);
|
||||
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without a NodeId").__();
|
||||
return null;
|
||||
}
|
||||
NodeId nodeId = null;
|
||||
try {
|
||||
nodeId = NodeId.fromString(nodeIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
|
||||
.__();
|
||||
return null;
|
||||
}
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
private String verifyAndGetAppOwner(Block html) {
|
||||
String appOwner = $(APP_OWNER);
|
||||
if (appOwner == null || appOwner.isEmpty()) {
|
||||
html.h1().__("Cannot get container logs without an app owner").__();
|
||||
}
|
||||
return appOwner;
|
||||
}
|
||||
|
||||
private static class LogLimits {
|
||||
long start;
|
||||
long end;
|
||||
}
|
||||
|
||||
private LogLimits verifyAndGetLogLimits(Block html) {
|
||||
long start = -4096;
|
||||
long end = Long.MAX_VALUE;
|
||||
boolean isValid = true;
|
||||
|
||||
String startStr = $("start");
|
||||
if (startStr != null && !startStr.isEmpty()) {
|
||||
try {
|
||||
start = Long.parseLong(startStr);
|
||||
} catch (NumberFormatException e) {
|
||||
isValid = false;
|
||||
html.h1().__("Invalid log start value: " + startStr).__();
|
||||
}
|
||||
}
|
||||
|
||||
String endStr = $("end");
|
||||
if (endStr != null && !endStr.isEmpty()) {
|
||||
try {
|
||||
end = Long.parseLong(endStr);
|
||||
} catch (NumberFormatException e) {
|
||||
isValid = false;
|
||||
html.h1().__("Invalid log end value: " + endStr).__();
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValid) {
|
||||
return null;
|
||||
}
|
||||
|
||||
LogLimits limits = new LogLimits();
|
||||
limits.start = start;
|
||||
limits.end = end;
|
||||
return limits;
|
||||
fileController.renderAggregatedLogsBlock(html, this.context());
|
||||
}
|
||||
|
||||
private String getApplicationLogURL(ApplicationId applicationId) {
|
||||
|
|
|
@ -1182,7 +1182,7 @@
|
|||
<property>
|
||||
<description>Class that supports TFile read and write operations.</description>
|
||||
<name>yarn.log-aggregation.file-controller.TFile.class</name>
|
||||
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
|
||||
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
|
|
|
@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.TFileAggregatedLogsBlock;
|
||||
import org.apache.hadoop.yarn.webapp.YarnWebParams;
|
||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
|
||||
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
@ -56,6 +58,9 @@ import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
|
|||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
|
@ -77,12 +82,14 @@ public class TestAggregatedLogsBlock {
|
|||
|
||||
writeLog(configuration, "owner");
|
||||
|
||||
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
|
||||
configuration, "owner", "container_0_0001_01_000001");
|
||||
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
PrintWriter printWriter = new PrintWriter(data);
|
||||
HtmlBlock html = new HtmlBlockForTest();
|
||||
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
|
||||
TFileAggregatedLogsBlockForTest aggregatedBlock
|
||||
= getTFileAggregatedLogsBlockForTest(configuration, "owner",
|
||||
"container_0_0001_01_000001", "localhost:1234");
|
||||
aggregatedBlock.render(block);
|
||||
|
||||
block.getWriter().flush();
|
||||
|
@ -158,12 +165,13 @@ public class TestAggregatedLogsBlock {
|
|||
|
||||
writeLog(configuration, "admin");
|
||||
|
||||
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
|
||||
configuration, "admin", "container_0_0001_01_000001");
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
PrintWriter printWriter = new PrintWriter(data);
|
||||
HtmlBlock html = new HtmlBlockForTest();
|
||||
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
|
||||
TFileAggregatedLogsBlockForTest aggregatedBlock
|
||||
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
|
||||
"container_0_0001_01_000001", "localhost:1234");
|
||||
aggregatedBlock.render(block);
|
||||
|
||||
block.getWriter().flush();
|
||||
|
@ -191,13 +199,13 @@ public class TestAggregatedLogsBlock {
|
|||
"/application_1440536969523_0001.har";
|
||||
FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path));
|
||||
|
||||
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
|
||||
configuration, "admin",
|
||||
"container_1440536969523_0001_01_000001", "host1:1111");
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
PrintWriter printWriter = new PrintWriter(data);
|
||||
HtmlBlock html = new HtmlBlockForTest();
|
||||
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
|
||||
TFileAggregatedLogsBlockForTest aggregatedBlock
|
||||
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
|
||||
"container_1440536969523_0001_01_000001", "host1:1111");
|
||||
aggregatedBlock.render(block);
|
||||
|
||||
block.getWriter().flush();
|
||||
|
@ -206,7 +214,7 @@ public class TestAggregatedLogsBlock {
|
|||
assertTrue(out.contains("Hello stdout"));
|
||||
assertTrue(out.contains("Hello syslog"));
|
||||
|
||||
aggregatedBlock = getAggregatedLogsBlockForTest(
|
||||
aggregatedBlock = getTFileAggregatedLogsBlockForTest(
|
||||
configuration, "admin",
|
||||
"container_1440536969523_0001_01_000002", "host2:2222");
|
||||
data = new ByteArrayOutputStream();
|
||||
|
@ -237,12 +245,13 @@ public class TestAggregatedLogsBlock {
|
|||
}
|
||||
writeLog(configuration, "admin");
|
||||
|
||||
AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
|
||||
configuration, "admin", "container_0_0001_01_000001");
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
PrintWriter printWriter = new PrintWriter(data);
|
||||
HtmlBlock html = new HtmlBlockForTest();
|
||||
HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
|
||||
TFileAggregatedLogsBlockForTest aggregatedBlock
|
||||
= getTFileAggregatedLogsBlockForTest(configuration, "admin",
|
||||
"container_0_0001_01_000001", "localhost:1234");
|
||||
aggregatedBlock.render(block);
|
||||
|
||||
block.getWriter().flush();
|
||||
|
@ -251,7 +260,6 @@ public class TestAggregatedLogsBlock {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private Configuration getConfiguration() {
|
||||
Configuration configuration = new YarnConfiguration();
|
||||
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
|
@ -267,6 +275,28 @@ public class TestAggregatedLogsBlock {
|
|||
"localhost:1234");
|
||||
}
|
||||
|
||||
private TFileAggregatedLogsBlockForTest getTFileAggregatedLogsBlockForTest(
|
||||
Configuration configuration, String user, String containerId,
|
||||
String nodeName) {
|
||||
HttpServletRequest request = mock(HttpServletRequest.class);
|
||||
when(request.getRemoteUser()).thenReturn(user);
|
||||
ViewContext mockContext = mock(ViewContext.class);
|
||||
TFileAggregatedLogsBlockForTest aggregatedBlock
|
||||
= new TFileAggregatedLogsBlockForTest(mockContext,
|
||||
configuration);
|
||||
aggregatedBlock.setRequest(request);
|
||||
aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID,
|
||||
containerId);
|
||||
aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME,
|
||||
nodeName);
|
||||
aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user);
|
||||
aggregatedBlock.moreParams().put("start", "");
|
||||
aggregatedBlock.moreParams().put("end", "");
|
||||
aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity");
|
||||
return aggregatedBlock;
|
||||
}
|
||||
|
||||
|
||||
private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest(
|
||||
Configuration configuration, String user, String containerId,
|
||||
String nodeName) {
|
||||
|
@ -340,4 +370,32 @@ public class TestAggregatedLogsBlock {
|
|||
writer.close();
|
||||
}
|
||||
|
||||
private static class TFileAggregatedLogsBlockForTest
|
||||
extends TFileAggregatedLogsBlock {
|
||||
|
||||
private Map<String, String> params = new HashMap<String, String>();
|
||||
private HttpServletRequest request;
|
||||
|
||||
@Inject
|
||||
TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) {
|
||||
super(ctx, conf);
|
||||
}
|
||||
|
||||
public void render(Block html) {
|
||||
super.render(html);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> moreParams() {
|
||||
return params;
|
||||
}
|
||||
|
||||
public HttpServletRequest request() {
|
||||
return request;
|
||||
}
|
||||
|
||||
public void setRequest(HttpServletRequest request) {
|
||||
this.request = request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,19 +23,27 @@ import static org.junit.Assert.*;
|
|||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Writer;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -167,5 +175,34 @@ public class TestLogAggregationFileControllerFactory {
|
|||
throws IOException {
|
||||
// Do Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|
||||
OutputStream os) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renderAggregatedLogsBlock(Block html, ViewContext context) {
|
||||
// DO NOTHING
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationOwner(Path aggregatedLogPath)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationAcls(
|
||||
Path aggregatedLogPath) throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -54,8 +55,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
|
@ -92,12 +94,14 @@ public class AHSWebServices extends WebServices {
|
|||
private static final Joiner JOINER = Joiner.on("");
|
||||
private static final Joiner DOT_JOINER = Joiner.on(". ");
|
||||
private final Configuration conf;
|
||||
private final LogAggregationFileControllerFactory factory;
|
||||
|
||||
@Inject
|
||||
public AHSWebServices(ApplicationBaseProtocol appBaseProt,
|
||||
Configuration conf) {
|
||||
super(appBaseProt);
|
||||
this.conf = conf;
|
||||
this.factory = new LogAggregationFileControllerFactory(conf);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -525,9 +529,17 @@ public class AHSWebServices extends WebServices {
|
|||
@Override
|
||||
public void write(OutputStream os) throws IOException,
|
||||
WebApplicationException {
|
||||
byte[] buf = new byte[65535];
|
||||
boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
|
||||
appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setBytes(bytes);
|
||||
request.setNodeId(nodeId);
|
||||
Set<String> logTypes = new HashSet<>();
|
||||
logTypes.add(logFile);
|
||||
request.setLogTypes(logTypes);
|
||||
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogs(request, os);
|
||||
if (!findLogs) {
|
||||
os.write(("Can not find logs for container:"
|
||||
+ containerIdStr).getBytes(Charset.forName("UTF-8")));
|
||||
|
@ -558,9 +570,14 @@ public class AHSWebServices extends WebServices {
|
|||
final String nodeId, final String containerIdStr,
|
||||
boolean emptyLocalContainerLogMeta) {
|
||||
try {
|
||||
List<ContainerLogMeta> containerLogMeta = LogToolUtils
|
||||
.getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
|
||||
nodeId, appOwner);
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setNodeId(nodeId);
|
||||
List<ContainerLogMeta> containerLogMeta = factory
|
||||
.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogsMeta(request);
|
||||
if (containerLogMeta.isEmpty()) {
|
||||
throw new NotFoundException(
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
|||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
|
||||
|
|
|
@ -23,8 +23,10 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -55,8 +57,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -90,6 +94,7 @@ public class NMWebServices {
|
|||
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
private final String redirectWSUrl;
|
||||
private final LogAggregationFileControllerFactory factory;
|
||||
|
||||
private @javax.ws.rs.core.Context
|
||||
HttpServletRequest request;
|
||||
|
@ -108,6 +113,8 @@ public class NMWebServices {
|
|||
this.webapp = webapp;
|
||||
this.redirectWSUrl = this.nmContext.getConf().get(
|
||||
YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL);
|
||||
this.factory = new LogAggregationFileControllerFactory(
|
||||
this.nmContext.getConf());
|
||||
}
|
||||
|
||||
private void init() {
|
||||
|
@ -262,10 +269,14 @@ public class NMWebServices {
|
|||
Application app = this.nmContext.getApplications().get(appId);
|
||||
String appOwner = app == null ? null : app.getUser();
|
||||
try {
|
||||
List<ContainerLogMeta> containerLogMeta = LogToolUtils
|
||||
.getContainerLogMetaFromRemoteFS(this.nmContext.getConf(),
|
||||
appId, containerIdStr,
|
||||
this.nmContext.getNodeId().toString(), appOwner);
|
||||
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
||||
logRequest.setAppId(appId);
|
||||
logRequest.setAppOwner(appOwner);
|
||||
logRequest.setContainerId(containerIdStr);
|
||||
logRequest.setNodeId(this.nmContext.getNodeId().toString());
|
||||
List<ContainerLogMeta> containerLogMeta = factory
|
||||
.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogsMeta(logRequest);
|
||||
if (!containerLogMeta.isEmpty()) {
|
||||
for (ContainerLogMeta logMeta : containerLogMeta) {
|
||||
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
|
||||
|
@ -449,12 +460,17 @@ public class NMWebServices {
|
|||
Application app = nmContext.getApplications().get(appId);
|
||||
String appOwner = app == null ? null : app.getUser();
|
||||
try {
|
||||
int bufferSize = 65536;
|
||||
byte[] buf = new byte[bufferSize];
|
||||
LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
|
||||
appId, appOwner, containerId.toString(),
|
||||
nmContext.getNodeId().toString(), outputFileName, bytes,
|
||||
os, buf);
|
||||
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
||||
logRequest.setAppId(appId);
|
||||
logRequest.setAppOwner(appOwner);
|
||||
logRequest.setContainerId(containerId.toString());
|
||||
logRequest.setNodeId(nmContext.getNodeId().toString());
|
||||
logRequest.setBytes(bytes);
|
||||
Set<String> logTypes = new HashSet<>();
|
||||
logTypes.add(outputFileName);
|
||||
logRequest.setLogTypes(logTypes);
|
||||
factory.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogs(logRequest, os);
|
||||
} catch (Exception ex) {
|
||||
// Something wrong when we try to access the aggregated log.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
|
|
|
@ -105,7 +105,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
|||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
|
|
Loading…
Reference in New Issue