YARN-6099. Improve webservice to list aggregated log files. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-01-24 15:28:52 -08:00
parent 1309accd68
commit 8027c3e8b9
15 changed files with 836 additions and 466 deletions

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
@ -409,9 +410,10 @@ private boolean fetchAllLogFiles(String[] logFiles, String[] logFilesRegex) {
return false; return false;
} }
private List<PerLogFileInfo> getContainerLogFiles(Configuration conf, private List<PerContainerLogFileInfo> getContainerLogFiles(
String containerIdStr, String nodeHttpAddress) throws IOException { Configuration conf, String containerIdStr, String nodeHttpAddress)
List<PerLogFileInfo> logFileInfos = new ArrayList<>(); throws IOException {
List<PerContainerLogFileInfo> logFileInfos = new ArrayList<>();
Client webServiceClient = Client.create(); Client webServiceClient = Client.create();
try { try {
WebResource webResource = webServiceClient WebResource webResource = webServiceClient
@ -426,11 +428,20 @@ private List<PerLogFileInfo> getContainerLogFiles(Configuration conf,
try { try {
JSONObject json = JSONObject json =
response.getEntity(JSONObject.class); response.getEntity(JSONObject.class);
JSONArray array = json.getJSONArray("containerLogInfo"); JSONArray array = json.getJSONArray("containerLogsInfo");
for (int i = 0; i < array.length(); i++) { for (int i = 0; i < array.length(); i++) {
String fileName = array.getJSONObject(i).getString("fileName"); JSONObject log = array.getJSONObject(i);
String fileSize = array.getJSONObject(i).getString("fileSize"); Object ob = log.get("containerLogInfo");
logFileInfos.add(new PerLogFileInfo(fileName, fileSize)); if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)));
}
} else if (ob instanceof JSONObject) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob));
}
} }
} catch (Exception e) { } catch (Exception e) {
System.err.println("Unable to parse json from webservice. Error:"); System.err.println("Unable to parse json from webservice. Error:");
@ -446,6 +457,19 @@ private List<PerLogFileInfo> getContainerLogFiles(Configuration conf,
return logFileInfos; return logFileInfos;
} }
private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
JSONObject meta) throws JSONException {
String fileName = meta.has("fileName") ?
meta.getString("fileName") : "N/A";
String fileSize = meta.has("fileSize") ?
meta.getString("fileSize") : "N/A";
String lastModificationTime = meta.has("lastModifiedTime") ?
meta.getString("lastModifiedTime") : "N/A";
return new PerContainerLogFileInfo(fileName, fileSize,
lastModificationTime);
}
@Private @Private
@VisibleForTesting @VisibleForTesting
public int printContainerLogsFromRunningApplication(Configuration conf, public int printContainerLogsFromRunningApplication(Configuration conf,
@ -1161,53 +1185,29 @@ private int printContainerInfoFromRunningApplication(
outStream.println(containerString); outStream.println(containerString);
outStream.println(StringUtils.repeat("=", containerString.length())); outStream.println(StringUtils.repeat("=", containerString.length()));
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogType", "LogLength"); "LogFile", "LogLength", "LastModificationTime");
outStream.println(StringUtils.repeat("=", containerString.length())); outStream.println(StringUtils.repeat("=", containerString.length()));
List<PerLogFileInfo> infos = getContainerLogFiles( List<PerContainerLogFileInfo> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress); getConf(), containerId, nodeHttpAddress);
for (PerLogFileInfo info : infos) { for (PerContainerLogFileInfo info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getFileName(), info.getFileLength()); info.getFileName(), info.getFileSize(),
info.getLastModifiedTime());
} }
} }
return 0; return 0;
} }
private static class PerLogFileInfo {
private String fileName;
private String fileLength;
public PerLogFileInfo(String fileName, String fileLength) {
setFileName(fileName);
setFileLength(fileLength);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getFileLength() {
return fileLength;
}
public void setFileLength(String fileLength) {
this.fileLength = fileLength;
}
}
@VisibleForTesting @VisibleForTesting
public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request, public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request,
boolean useRegex) throws IOException { boolean useRegex) throws IOException {
// fetch all the log files for the container // fetch all the log files for the container
// filter the log files based on the given -log_files pattern // filter the log files based on the given -log_files pattern
List<PerLogFileInfo> allLogFileInfos= List<PerContainerLogFileInfo> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(), getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress()); request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>(); List<String> fileNames = new ArrayList<String>();
for (PerLogFileInfo fileInfo : allLogFileInfos) { for (PerContainerLogFileInfo fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getFileName()); fileNames.add(fileInfo.getFileName());
} }
return getMatchedLogFiles(request, fileNames, return getMatchedLogFiles(request, fileNames,

View File

@ -961,7 +961,7 @@ public static int readContainerLogsForALogType(
@Private @Private
public static Pair<String, String> readContainerMetaDataAndSkipData( public static Pair<String, String> readContainerMetaDataAndSkipData(
DataInputStream valueStream, PrintStream out) throws IOException { DataInputStream valueStream) throws IOException {
String fileType = valueStream.readUTF(); String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF(); String fileLengthStr = valueStream.readUTF();

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.util.ArrayList;
import java.util.List;
/**
* The ContainerLogMeta includes:
* <ul>
* <li>The Container Id.</li>
* <li>The NodeManager Id.</li>
* <li>A list of {@link PerContainerLogFileInfo}.</li>
* </ul>
*
*/
public class ContainerLogMeta {
private String containerId;
private String nodeId;
private List<PerContainerLogFileInfo> logMeta;
public ContainerLogMeta(String containerId, String nodeId) {
this.containerId = containerId;
this.nodeId = nodeId;
logMeta = new ArrayList<>();
}
public String getNodeId() {
return this.nodeId;
}
public String getContainerId() {
return this.containerId;
}
public void addLogMeta(String fileName, String fileSize,
String lastModificationTime) {
logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
lastModificationTime));
}
public List<PerContainerLogFileInfo> getContainerLogMeta() {
return this.logMeta;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
/**
* Enumeration of various type of a container log.
*/
public enum ContainerLogType {
/** The log is from NodeManager local log directory. */
LOCAL,
/** The log is from Remote FileSystem application log directory. */
AGGREGATED
}

View File

@ -20,12 +20,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
@Private @Private
public class LogAggregationUtils { public class LogAggregationUtils {
@ -77,9 +81,6 @@ public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix); return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
} }
// TODO Add a utility method to list available log files. Ignore the
// temporary ones.
/** /**
* Gets the remote log user dir. * Gets the remote log user dir.
* @param remoteRootLogDir * @param remoteRootLogDir
@ -115,4 +116,61 @@ public static String getNodeString(NodeId nodeId) {
public static String getNodeString(String nodeId) { public static String getNodeString(String nodeId) {
return nodeId.toString().replace(":", "_"); return nodeId.toString().replace(":", "_");
} }
/**
* Return the remote application log directory.
* @param conf the configuration
* @param appId the application
* @param appOwner the application owner
* @return the remote application log directory path
* @throws IOException if we can not find remote application log directory
*/
public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
Configuration conf, ApplicationId appId, String appOwner)
throws IOException {
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir =
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
throw new IOException("Can not find remote application directory for "
+ "the application:" + appId);
}
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, appOwner, suffix);
}
return remoteAppDir;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
Configuration conf, ApplicationId appId, String appOwner)
throws IOException {
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner);
RemoteIterator<FileStatus> nodeFiles = null;
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
conf).listStatus(remoteAppLogDir);
return nodeFiles;
}
} }

View File

@ -32,7 +32,6 @@
import java.util.Set; import java.util.Set;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -52,7 +51,7 @@
public class LogCLIHelpers implements Configurable { public class LogCLIHelpers implements Configurable {
public static final String PER_LOG_FILE_INFO_PATTERN = public static final String PER_LOG_FILE_INFO_PATTERN =
"%20s\t%20s" + System.getProperty("line.separator"); "%30s\t%30s\t%30s" + System.getProperty("line.separator");
public static final String CONTAINER_ON_NODE_PATTERN = public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s"; "Container: %s on %s";
@ -412,7 +411,8 @@ public int dumpAllContainersLogs(ContainerLogsRequest options)
} }
} }
if (!foundAnyLogs) { if (!foundAnyLogs) {
emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString()); emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
.toString());
return -1; return -1;
} }
return 0; return 0;
@ -426,67 +426,15 @@ public int printAContainerLogMetadata(ContainerLogsRequest options,
String appOwner = options.getAppOwner(); String appOwner = options.getAppOwner();
String nodeId = options.getNodeId(); String nodeId = options.getNodeId();
String containerIdStr = options.getContainerId(); String containerIdStr = options.getContainerId();
boolean getAllContainers = (containerIdStr == null); List<ContainerLogMeta> containersLogMeta;
String nodeIdStr = (nodeId == null) ? null try {
: LogAggregationUtils.getNodeString(nodeId); containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir( conf, appId, containerIdStr, nodeId, appOwner);
appId, appOwner); } catch (Exception ex) {
if (nodeFiles == null) { err.println(ex.getMessage());
return -1; return -1;
} }
boolean foundAnyLogs = false; if (containersLogMeta.isEmpty()) {
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
key, thisNodeFile.getPath().getName());
out.println(containerString);
out.println("Log Upload Time:"
+ thisNodeFile.getModificationTime());
out.println(StringUtils.repeat("=", containerString.length()));
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogType", "LogLength");
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream, out);
out.printf(PER_LOG_FILE_INFO_PATTERN,
logMeta.getFirst(), logMeta.getSecond());
} catch (EOFException eof) {
break;
}
}
foundAnyLogs = true;
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
if (!foundAnyLogs) {
if (containerIdStr != null && nodeId != null) { if (containerIdStr != null && nodeId != null) {
err.println("The container " + containerIdStr + " couldn't be found " err.println("The container " + containerIdStr + " couldn't be found "
+ "on the node specified: " + nodeId); + "on the node specified: " + nodeId);
@ -499,6 +447,21 @@ public int printAContainerLogMetadata(ContainerLogsRequest options,
} }
return -1; return -1;
} }
for (ContainerLogMeta containerLogMeta : containersLogMeta) {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerLogMeta.getContainerId(), containerLogMeta.getNodeId());
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime");
out.println(StringUtils.repeat("=", containerString.length()));
for (PerContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime());
}
}
return 0; return 0;
} }
@ -583,34 +546,21 @@ public void printContainersList(ContainerLogsRequest options,
private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId, private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
String appOwner) throws IOException { String appOwner) throws IOException {
Path remoteAppLogDir = getRemoteAppLogDir(appId, appOwner);
RemoteIterator<FileStatus> nodeFiles = null; RemoteIterator<FileStatus> nodeFiles = null;
try { try {
Path qualifiedLogDir = nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir); conf, appId, appOwner);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
getConf()).listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) { } catch (FileNotFoundException fnf) {
logDirNotExist(remoteAppLogDir.toString()); logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString());
} catch (AccessControlException | AccessDeniedException ace) { } catch (AccessControlException | AccessDeniedException ace) {
logDirNoAccessPermission(remoteAppLogDir.toString(), appOwner, logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
conf, appId, appOwner).toString(), appOwner,
ace.getMessage()); ace.getMessage());
} }
return nodeFiles; return nodeFiles;
} }
private Path getRemoteAppLogDir(ApplicationId appId, String appOwner) {
Path remoteRootLogDir = new Path(getConf().get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = appOwner;
String logDirSuffix = LogAggregationUtils
.getRemoteNodeLogDirSuffix(getConf());
// TODO Change this to get a list of files from the LAS.
return LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, user, logDirSuffix);
}
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -701,7 +651,7 @@ public Set<String> listContainerLogs(ContainerLogsRequest options)
while (true) { while (true) {
try { try {
String logFile = LogReader.readContainerMetaDataAndSkipData( String logFile = LogReader.readContainerMetaDataAndSkipData(
valueStream, null).getFirst(); valueStream).getFirst();
logTypes.add(logFile); logTypes.add(logFile);
} catch (EOFException eof) { } catch (EOFException eof) {
break; break;

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.util.Times;
/**
* This class contains several utility function which could be used in different
* log tools.
*
*/
public final class LogToolUtils {
private LogToolUtils() {}
/**
* Return a list of {@link ContainerLogMeta} for a container
* from Remote FileSystem.
*
* @param conf the configuration
* @param appId the applicationId
* @param containerIdStr the containerId
* @param nodeId the nodeId
* @param appOwner the application owner
* @return a list of {@link ContainerLogMeta}
* @throws IOException if there is no available log file
*/
public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
Configuration conf, ApplicationId appId, String containerIdStr,
String nodeId, String appOwner) throws IOException {
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
boolean getAllContainers = (containerIdStr == null);
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
if (nodeFiles == null) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) {
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
key.toString(), thisNodeFile.getPath().getName());
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream);
containerLogMeta.addLogMeta(
logMeta.getFirst(),
logMeta.getSecond(),
Times.format(thisNodeFile.getModificationTime()));
} catch (EOFException eof) {
break;
}
}
containersLogMeta.add(containerLogMeta);
if (!getAllContainers) {
break;
}
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
return containersLogMeta;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
/**
* PerContainerLogFileInfo represents the meta data for a container log file,
* which includes:
* <ul>
* <li>The filename of the container log.</li>
* <li>The size of the container log.</li>
* <li>The last modification time of the container log.</li>
* </ul>
*
*/
public class PerContainerLogFileInfo {
private String fileName;
private String fileSize;
private String lastModifiedTime;
//JAXB needs this
public PerContainerLogFileInfo() {}
public PerContainerLogFileInfo(String fileName, String fileSize,
String lastModifiedTime) {
this.setFileName(fileName);
this.setFileSize(fileSize);
this.setLastModifiedTime(lastModifiedTime);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getFileSize() {
return fileSize;
}
public void setFileSize(String fileSize) {
this.fileSize = fileSize;
}
public String getLastModifiedTime() {
return lastModifiedTime;
}
public void setLastModifiedTime(String lastModifiedTime) {
this.lastModifiedTime = lastModifiedTime;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
result = prime * result + ((lastModifiedTime == null) ?
0 : lastModifiedTime.hashCode());
return result;
}
@Override
public boolean equals(Object otherObj) {
if (otherObj == this) {
return true;
}
if (!(otherObj instanceof PerContainerLogFileInfo)) {
return false;
}
PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
&& other.lastModifiedTime.equals(lastModifiedTime);
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* This class contains several utility functions for log aggregation tests.
*/
public final class TestContainerLogsUtils {
private TestContainerLogsUtils() {}
/**
* Utility function to create container log file and upload
* it into remote file system.
* @param conf the configuration
* @param fs the FileSystem
* @param rootLogDir the root log directory
* @param containerId the containerId
* @param nodeId the nodeId
* @param fileName the log file name
* @param user the application user
* @param content the log context
* @param deletePreviousRemoteLogDir whether to delete remote log dir.
* @throws IOException if we can not create log files locally
* or we can not upload container logs into RemoteFS.
*/
public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
String fileName, String user, String content,
boolean deleteRemoteLogDir) throws IOException {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
// create local logs
List<String> rootLogDirList = new ArrayList<String>();
rootLogDirList.add(rootLogDir);
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
content);
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR),
user + "/logs/" + appId.toString());
if (fs.exists(path) && deleteRemoteLogDir) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
containerId, path, fs);
}
private static void createContainerLogInLocalDir(Path appLogsDir,
ContainerId containerId, FileSystem fs, String fileName, String content)
throws IOException{
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
}
assertTrue(fs.mkdirs(containerLogsDir));
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
writer.close();
}
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
writer.close();
}
}

View File

@ -23,9 +23,9 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -37,12 +37,12 @@
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -57,9 +57,11 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@ -71,6 +73,7 @@
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -234,7 +237,7 @@ public Response getContainerLogsInfo(
init(res); init(res);
try { try {
containerId = ContainerId.fromString(containerIdStr); containerId = ContainerId.fromString(containerIdStr);
} catch (Exception e) { } catch (IllegalArgumentException e) {
throw new BadRequestException("invalid container id, " + containerIdStr); throw new BadRequestException("invalid container id, " + containerIdStr);
} }
ApplicationId appId = containerId.getApplicationAttemptId() ApplicationId appId = containerId.getApplicationAttemptId()
@ -244,8 +247,15 @@ public Response getContainerLogsInfo(
appInfo = super.getApp(req, res, appId.toString()); appInfo = super.getApp(req, res, appId.toString());
} catch (Exception ex) { } catch (Exception ex) {
// directly find logs from HDFS. // directly find logs from HDFS.
return getContainerLogMeta(appId, null, null, containerIdStr); return getContainerLogMeta(appId, null, null, containerIdStr, false);
} }
// if the application finishes, directly find logs
// from HDFS.
if (isFinishedState(appInfo.getAppState())) {
return getContainerLogMeta(appId, null, null,
containerIdStr, false);
}
if (isRunningState(appInfo.getAppState())) {
String appOwner = appInfo.getUser(); String appOwner = appInfo.getUser();
ContainerInfo containerInfo; ContainerInfo containerInfo;
try { try {
@ -254,15 +264,11 @@ public Response getContainerLogsInfo(
containerId.getApplicationAttemptId().toString(), containerId.getApplicationAttemptId().toString(),
containerId.toString()); containerId.toString());
} catch (Exception ex) { } catch (Exception ex) {
if (isFinishedState(appInfo.getAppState())) { // return log meta for the aggregated logs if exists.
// directly find logs from HDFS. // It will also return empty log meta for the local logs.
return getContainerLogMeta(appId, appOwner, null, containerIdStr); return getContainerLogMeta(appId, appOwner, null,
containerIdStr, true);
} }
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId);
}
String nodeId = containerInfo.getNodeId();
if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = containerInfo.getNodeHttpAddress(); String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs"; String uri = "/" + containerId.toString() + "/logs";
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri); String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
@ -274,11 +280,8 @@ public Response getContainerLogsInfo(
HttpServletResponse.SC_TEMPORARY_REDIRECT); HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI); response.header("Location", resURI);
return response.build(); return response.build();
} else if (isFinishedState(appInfo.getAppState())) {
return getContainerLogMeta(appId, appOwner, nodeId,
containerIdStr);
} else { } else {
return createBadResponse(Status.NOT_FOUND, throw new NotFoundException(
"The application is not at Running or Finished State."); "The application is not at Running or Finished State.");
} }
} }
@ -593,90 +596,39 @@ private long parseLongParam(String bytes) {
} }
private Response getContainerLogMeta(ApplicationId appId, String appOwner, private Response getContainerLogMeta(ApplicationId appId, String appOwner,
final String nodeId, final String containerIdStr) { final String nodeId, final String containerIdStr,
Map<String, String> containerLogMeta = new HashMap<>(); boolean emptyLocalContainerLogMeta) {
try { try {
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); List<ContainerLogMeta> containerLogMeta = LogToolUtils
org.apache.hadoop.fs.Path remoteRootLogDir = .getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
new org.apache.hadoop.fs.Path(conf.get( nodeId, appOwner);
YarnConfiguration.NM_REMOTE_APP_LOG_DIR, if (containerLogMeta.isEmpty()) {
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR, return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log meta for container: " + containerIdStr); "Can not get log meta for container: " + containerIdStr);
} }
remoteAppDir = matching[0].getPath(); List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
} else { for (ContainerLogMeta meta : containerLogMeta) {
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
remoteRootLogDir, appId, appOwner, suffix); ContainerLogType.AGGREGATED);
containersLogsInfo.add(logInfo);
} }
final RemoteIterator<FileStatus> nodeFiles; if (emptyLocalContainerLogMeta) {
nodeFiles = fc.listStatus(remoteAppDir); ContainerLogMeta emptyMeta = new ContainerLogMeta(
if (!nodeFiles.hasNext()) { containerIdStr, "N/A");
return createBadResponse(Status.INTERNAL_SERVER_ERROR, ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
"Can not get log meta for container: " + containerIdStr); ContainerLogType.LOCAL);
containersLogsInfo.add(empty);
} }
String nodeIdStr = (nodeId == null) ? null GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
: LogAggregationUtils.getNodeString(nodeId); ContainerLogsInfo>>(containersLogsInfo){};
while (nodeFiles.hasNext()) { ResponseBuilder response = Response.ok(meta);
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (key.toString().equals(containerIdStr)) {
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream, null);
containerLogMeta.put(logMeta.getFirst(),
logMeta.getSecond());
} catch (EOFException eof) {
break;
}
}
break;
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
ResponseBuilder response = Response.ok(new ContainerLogsInfo(
containerLogMeta));
// Sending the X-Content-Type-Options response header with the value // Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response // nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type. // away from the declared content-type.
response.header("X-Content-Type-Options", "nosniff"); response.header("X-Content-Type-Options", "nosniff");
return response.build(); return response.build();
} catch (Exception ex) { } catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR, throw new WebApplicationException(ex);
ex.getMessage());
} }
} }
} }

View File

@ -22,13 +22,9 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -42,7 +38,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
@ -55,8 +50,9 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
@ -88,6 +84,7 @@
import com.google.inject.servlet.ServletModule; import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -503,51 +500,20 @@ public void testSingleContainer() throws Exception {
public void testContainerLogsForFinishedApps() throws Exception { public void testContainerLogsForFinishedApps() throws Exception {
String fileName = "syslog"; String fileName = "syslog";
String user = "user1"; String user = "user1";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
NodeId nodeId = NodeId.newInstance("test host", 100); NodeId nodeId = NodeId.newInstance("test host", 100);
NodeId nodeId2 = NodeId.newInstance("host2", 1234); NodeId nodeId2 = NodeId.newInstance("host2", 1234);
//prepare the logs for remote directory
ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationId appId = ApplicationId.newInstance(0, 1);
// create local logs ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
List<String> rootLogDirList = new ArrayList<String>(); appId, 1);
rootLogDirList.add(rootLogDir);
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
// create container logs in local log file dir
// create two container log files. We can get containerInfo
// for container1 from AHS, but can not get such info for
// container100
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100); ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
("Hello." + containerId1));
createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
("Hello." + containerId100));
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
containerId1, path, fs);
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
containerId100, path, fs);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1, nodeId, fileName, user,
("Hello." + containerId1), true);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId100, nodeId2, fileName, user,
("Hello." + containerId100), false);
// test whether we can find container log from remote diretory if // test whether we can find container log from remote diretory if
// the containerInfo for this container could be fetched from AHS. // the containerInfo for this container could be fetched from AHS.
WebResource r = resource(); WebResource r = resource();
@ -596,25 +562,14 @@ public void testContainerLogsForFinishedApps() throws Exception {
// create an application which can not be found from AHS // create an application which can not be found from AHS
ApplicationId appId100 = ApplicationId.newInstance(0, 100); ApplicationId appId100 = ApplicationId.newInstance(0, 100);
appLogsDir = new Path(rootLogDirPath, appId100.toString()); ApplicationAttemptId appAttemptId100 = ApplicationAttemptId.newInstance(
if (fs.exists(appLogsDir)) { appId100, 1);
fs.delete(appLogsDir, true); ContainerId containerId1ForApp100 = ContainerId.newContainerId(
} appAttemptId100, 1);
assertTrue(fs.mkdirs(appLogsDir));
ApplicationAttemptId appAttemptId100 = TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
ApplicationAttemptId.newInstance(appId100, 1); rootLogDir, containerId1ForApp100, nodeId, fileName, user,
ContainerId containerId1ForApp100 = ContainerId ("Hello." + containerId1ForApp100), true);
.newContainerId(appAttemptId100, 1);
createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
fileName, ("Hello." + containerId1ForApp100));
path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId100.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
containerId1ForApp100, path, fs);
r = resource(); r = resource();
response = r.path("ws").path("v1") response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs") .path("applicationhistory").path("containerlogs")
@ -692,35 +647,6 @@ public void testContainerLogsForFinishedApps() throws Exception {
assertEquals(responseText.getBytes().length, fullTextSize); assertEquals(responseText.getBytes().length, fullTextSize);
} }
private static void createContainerLogInLocalDir(Path appLogsDir,
ContainerId containerId, FileSystem fs, String fileName, String content)
throws Exception {
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
}
assertTrue(fs.mkdirs(containerLogsDir));
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
writer.close();
}
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
writer.close();
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testContainerLogsForRunningApps() throws Exception { public void testContainerLogsForRunningApps() throws Exception {
String fileName = "syslog"; String fileName = "syslog";
@ -766,6 +692,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
ApplicationAttemptId.newInstance(appId, 1); ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource(); WebResource r = resource();
// If we can get Container information from ATS, we re-direct the request
// to the nodemamager who runs the container.
URI requestURI = r.path("ws").path("v1") URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers") .path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs") .path(containerId1.toString()).path("logs")
@ -776,46 +704,52 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
assertTrue(redirectURL.contains("ws/v1/node/containers")); assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString())); assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs")); assertTrue(redirectURL.contains("/logs"));
// If we can not container information from ATS, we would try to
// get aggregated log meta from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String fileName = "syslog";
String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1000, nodeId, fileName, user, content, true);
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1000.toString()).path("logs")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(ContainerLogType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
assertEquals(logMeta.get(0).getFileSize(), String.valueOf(
content.length()));
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
}
}
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testContainerLogsMetaForFinishedApps() throws Exception { public void testContainerLogsMetaForFinishedApps() throws Exception {
String fileName = "syslog";
String user = "user1";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
NodeId nodeId = NodeId.newInstance("test host", 100);
//prepare the logs for remote directory
ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationId appId = ApplicationId.newInstance(0, 1);
// create local logs
List<String> rootLogDirList = new ArrayList<String>();
rootLogDirList.add(rootLogDir);
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1); ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
String fileName = "syslog";
String user = "user1";
String content = "Hello." + containerId1; String content = "Hello." + containerId1;
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName, NodeId nodeId = NodeId.newInstance("test host", 100);
content); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1, nodeId, fileName, user, content, true);
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
containerId1, path, fs);
WebResource r = resource(); WebResource r = resource();
ClientResponse response = r.path("ws").path("v1") ClientResponse response = r.path("ws").path("v1")
@ -824,12 +758,16 @@ public void testContainerLogsMetaForFinishedApps() throws Exception {
.queryParam("user.name", user) .queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class); .get(ClientResponse.class);
ContainerLogsInfo responseText = response.getEntity( List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
ContainerLogsInfo.class); List<ContainerLogsInfo>>(){});
assertEquals(responseText.getContainerLogsInfo().size(), 1); assertTrue(responseText.size() == 1);
assertEquals(responseText.getContainerLogsInfo().get(0).getFileName(), assertEquals(responseText.get(0).getLogType(),
fileName); ContainerLogType.AGGREGATED.toString());
assertEquals(responseText.getContainerLogsInfo().get(0).getFileSize(), List<PerContainerLogFileInfo> logMeta = responseText.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
assertEquals(logMeta.get(0).getFileSize(),
String.valueOf(content.length())); String.valueOf(content.length()));
} }

View File

@ -18,25 +18,26 @@
package org.apache.hadoop.yarn.server.webapp.dao; package org.apache.hadoop.yarn.server.webapp.dao;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
/** /**
* {@code ContainerLogsInfo} includes the log meta-data of containers. * {@code ContainerLogsInfo} includes the log meta-data of containers.
* <p> * <p>
* The container log meta-data includes details such as: * The container log meta-data includes details such as:
* <ul> * <ul>
* <li>The filename of the container log.</li> * <li>A list of {@link PerContainerLogFileInfo}.</li>
* <li>The size of the container log.</li> * <li>The container Id.</li>
* <li>The NodeManager Id.</li>
* <li>The logType: could be local or aggregated</li>
* </ul> * </ul>
*/ */
@ -45,57 +46,42 @@
public class ContainerLogsInfo { public class ContainerLogsInfo {
@XmlElement(name = "containerLogInfo") @XmlElement(name = "containerLogInfo")
protected List<ContainerLogInfo> containerLogsInfo; protected List<PerContainerLogFileInfo> containerLogsInfo;
@XmlElement(name = "logType")
protected String logType;
@XmlElement(name = "containerId")
protected String containerId;
@XmlElement(name = "nodeId")
protected String nodeId;
//JAXB needs this //JAXB needs this
public ContainerLogsInfo() {} public ContainerLogsInfo() {}
public ContainerLogsInfo(Map<String, String> containerLogMeta) public ContainerLogsInfo(ContainerLogMeta logMeta, ContainerLogType logType)
throws YarnException { throws YarnException {
this.containerLogsInfo = new ArrayList<ContainerLogInfo>(); this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
for (Entry<String, String> meta : containerLogMeta.entrySet()) { logMeta.getContainerLogMeta());
ContainerLogInfo info = new ContainerLogInfo(meta.getKey(), this.logType = logType.toString();
meta.getValue()); this.containerId = logMeta.getContainerId();
containerLogsInfo.add(info); this.nodeId = logMeta.getNodeId();
}
} }
public List<ContainerLogInfo> getContainerLogsInfo() { public List<PerContainerLogFileInfo> getContainerLogsInfo() {
return this.containerLogsInfo; return this.containerLogsInfo;
} }
/** public String getLogType() {
* It includes the log meta-data of a container. return this.logType;
*
*/
@Private
@VisibleForTesting
public static class ContainerLogInfo {
private String fileName;
private String fileSize;
//JAXB needs this
public ContainerLogInfo() {}
public ContainerLogInfo(String fileName, String fileSize) {
this.setFileName(fileName);
this.setFileSize(fileSize);
} }
public String getFileName() { public String getContainerId() {
return fileName; return this.containerId;
} }
public void setFileName(String fileName) { public String getNodeId() {
this.fileName = fileName; return this.nodeId;
}
public String getFileSize() {
return fileSize;
}
public void setFileSize(String fileSize) {
this.fileSize = fileSize;
}
} }
} }

View File

@ -21,6 +21,8 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -31,6 +33,7 @@
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
@ -45,6 +48,9 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -53,9 +59,10 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
@ -200,6 +207,8 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context
* *
* @param hsr * @param hsr
* HttpServletRequest * HttpServletRequest
* @param res
* HttpServletResponse
* @param containerIdStr * @param containerIdStr
* The container ID * The container ID
* @return * @return
@ -208,20 +217,52 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context
@GET @GET
@Path("/containers/{containerid}/logs") @Path("/containers/{containerid}/logs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ContainerLogsInfo getContainerLogsInfo(@javax.ws.rs.core.Context public Response getContainerLogsInfo(
HttpServletRequest hsr, @javax.ws.rs.core.Context HttpServletRequest hsr,
@javax.ws.rs.core.Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr) { @PathParam("containerid") String containerIdStr) {
ContainerId containerId = null; ContainerId containerId = null;
init(); init();
try { try {
containerId = ContainerId.fromString(containerIdStr); containerId = ContainerId.fromString(containerIdStr);
} catch (Exception e) { } catch (IllegalArgumentException ex) {
throw new BadRequestException("invalid container id, " + containerIdStr); throw new BadRequestException("invalid container id, " + containerIdStr);
} }
try { try {
return new ContainerLogsInfo(this.nmContext, containerId, List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
hsr.getRemoteUser()); containersLogsInfo.add(new NMContainerLogsInfo(
} catch (YarnException ex) { this.nmContext, containerId,
hsr.getRemoteUser(), ContainerLogType.LOCAL));
// check whether we have aggregated logs in RemoteFS. If exists, show the
// the log meta for the aggregated logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Application app = this.nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
List<ContainerLogMeta> containerLogMeta = LogToolUtils
.getContainerLogMetaFromRemoteFS(this.nmContext.getConf(),
appId, containerIdStr,
this.nmContext.getNodeId().toString(), appOwner);
if (!containerLogMeta.isEmpty()) {
for (ContainerLogMeta logMeta : containerLogMeta) {
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
ContainerLogType.AGGREGATED));
}
}
} catch (IOException ex) {
// Something wrong with we tries to access the remote fs for the logs.
// Skip it and do nothing
}
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
ContainerLogsInfo>>(containersLogsInfo){};
ResponseBuilder resp = Response.ok(meta);
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
resp.header("X-Content-Type-Options", "nosniff");
return resp.build();
} catch (Exception ex) {
throw new WebApplicationException(ex); throw new WebApplicationException(ex);
} }
} }
@ -288,7 +329,7 @@ public Response getLogs(@PathParam("containerid") String containerIdStr,
try { try {
containerId = ContainerId.fromString(containerIdStr); containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).build(); return Response.status(Status.BAD_REQUEST).entity(ex.getMessage()).build();
} }
File logFile = null; File logFile = null;

View File

@ -23,47 +23,42 @@
import java.util.List; import java.util.List;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils; import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Times;
/** /**
* {@code ContainerLogsInfo} includes the log meta-data of containers. * NMContainerLogsInfo represents the meta data for container logs
* <p> * which exist in NM local log directory.
* The container log meta-data includes details such as: * This class extends {@link ContainerLogsInfo}.
* <ul>
* <li>The filename of the container log.</li>
* <li>The size of the container log.</li>
* </ul>
*/ */
@XmlRootElement(name = "containerLogsInfo") @XmlRootElement(name = "containerLogsInfo")
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class ContainerLogsInfo { public class NMContainerLogsInfo extends ContainerLogsInfo {
@XmlElement(name = "containerLogInfo")
protected List<ContainerLogInfo> containerLogsInfo;
//JAXB needs this //JAXB needs this
public ContainerLogsInfo() {} public NMContainerLogsInfo() {}
public ContainerLogsInfo(final Context nmContext, public NMContainerLogsInfo(final Context nmContext,
final ContainerId containerId, String remoteUser) final ContainerId containerId, String remoteUser,
throws YarnException { ContainerLogType logType) throws YarnException {
this.logType = logType.toString();
this.containerId = containerId.toString();
this.nodeId = nmContext.getNodeId().toString();
this.containerLogsInfo = getContainerLogsInfo( this.containerLogsInfo = getContainerLogsInfo(
containerId, remoteUser, nmContext); containerId, remoteUser, nmContext);
} }
public List<ContainerLogInfo> getContainerLogsInfo() { private static List<PerContainerLogFileInfo> getContainerLogsInfo(
return this.containerLogsInfo; ContainerId id, String remoteUser, Context nmContext)
} throws YarnException {
List<PerContainerLogFileInfo> logFiles = new ArrayList<>();
private static List<ContainerLogInfo> getContainerLogsInfo(ContainerId id,
String remoteUser, Context nmContext) throws YarnException {
List<ContainerLogInfo> logFiles = new ArrayList<ContainerLogInfo>();
List<File> logDirs = ContainerLogsUtils.getContainerLogDirs( List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
id, remoteUser, nmContext); id, remoteUser, nmContext);
for (File containerLogsDir : logDirs) { for (File containerLogsDir : logDirs) {
@ -71,8 +66,9 @@ private static List<ContainerLogInfo> getContainerLogsInfo(ContainerId id,
if (logs != null) { if (logs != null) {
for (File log : logs) { for (File log : logs) {
if (log.isFile()) { if (log.isFile()) {
ContainerLogInfo logMeta = new ContainerLogInfo( PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo(
log.getName(), log.length()); log.getName(), Long.toString(log.length()),
Times.format(log.lastModified()));
logFiles.add(logMeta); logFiles.add(logMeta);
} }
} }
@ -80,33 +76,4 @@ private static List<ContainerLogInfo> getContainerLogsInfo(ContainerId id,
} }
return logFiles; return logFiles;
} }
private static class ContainerLogInfo {
private String fileName;
private long fileSize;
//JAXB needs this
public ContainerLogInfo() {}
public ContainerLogInfo(String fileName, long fileSize) {
this.setFileName(fileName);
this.setFileSize(fileSize);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public long getFileSize() {
return fileSize;
}
public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}
}
} }

View File

@ -26,12 +26,14 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringReader; import java.io.StringReader;
import java.util.List;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
@ -41,6 +43,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
@ -52,6 +57,7 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.JerseyTestBase;
@ -74,6 +80,7 @@
import com.google.inject.servlet.ServletModule; import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -94,6 +101,8 @@ public class TestNMWebServices extends JerseyTestBase {
TestNMWebServices.class.getSimpleName()); TestNMWebServices.class.getSimpleName());
private static File testLogDir = new File("target", private static File testLogDir = new File("target",
TestNMWebServices.class.getSimpleName() + "LogDir"); TestNMWebServices.class.getSimpleName() + "LogDir");
private static File testRemoteLogDir = new File("target",
TestNMWebServices.class.getSimpleName() + "remote-log-dir");
private Injector injector = Guice.createInjector(new ServletModule() { private Injector injector = Guice.createInjector(new ServletModule() {
@ -102,6 +111,9 @@ protected void configureServlets() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
testRemoteLogDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService(); dirsHandler = new LocalDirsHandlerService();
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService( NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler); NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
@ -164,12 +176,14 @@ public void setUp() throws Exception {
super.setUp(); super.setUp();
testRootDir.mkdirs(); testRootDir.mkdirs();
testLogDir.mkdir(); testLogDir.mkdir();
testRemoteLogDir.mkdir();
} }
@AfterClass @AfterClass
static public void stop() { static public void stop() {
FileUtil.fullyDelete(testRootDir); FileUtil.fullyDelete(testRootDir);
FileUtil.fullyDelete(testLogDir); FileUtil.fullyDelete(testLogDir);
FileUtil.fullyDelete(testRemoteLogDir);
} }
public TestNMWebServices() { public TestNMWebServices() {
@ -331,7 +345,7 @@ public void testContainerLogsWithOldAPI() throws IOException, JSONException{
} }
private void testContainerLogs(WebResource r, ContainerId containerId) private void testContainerLogs(WebResource r, ContainerId containerId)
throws IOException, JSONException { throws IOException {
final String containerIdStr = containerId.toString(); final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId(); .getApplicationAttemptId();
@ -437,10 +451,50 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.path("logs").accept(MediaType.APPLICATION_JSON) .path("logs").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class); .get(ClientResponse.class);
assertEquals(200, response.getStatus()); assertEquals(200, response.getStatus());
JSONObject json = response.getEntity(JSONObject.class); List<ContainerLogsInfo> responseList = response.getEntity(new GenericType<
assertEquals(json.getJSONObject("containerLogInfo") List<ContainerLogsInfo>>(){});
.getString("fileName"), filename); assertTrue(responseList.size() == 1);
assertEquals(responseList.get(0).getLogType(),
ContainerLogType.LOCAL.toString());
List<PerContainerLogFileInfo> logMeta = responseList.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), filename);
// now create an aggregated log in Remote File system
File tempLogDir = new File("target",
TestNMWebServices.class.getSimpleName() + "temp-log-dir");
try {
String aggregatedLogFile = filename + "-aggregated";
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
aggregatedLogFile, "user", logMessage, true);
r1 = resource();
response = r1.path("ws").path("v1").path("node")
.path("containers").path(containerIdStr)
.path("logs").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(200, response.getStatus());
responseList = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){});
assertEquals(responseList.size(), 2);
for (ContainerLogsInfo logInfo : responseList) {
if(logInfo.getLogType().equals(
ContainerLogType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), filename);
}
}
} finally {
FileUtil.fullyDelete(tempLogDir);
}
// After container is completed, it is removed from nmContext // After container is completed, it is removed from nmContext
nmContext.getContainers().remove(containerId); nmContext.getContainers().remove(containerId);
Assert.assertNull(nmContext.getContainers().get(containerId)); Assert.assertNull(nmContext.getContainers().get(containerId));