YARN-6099. Improve webservice to list aggregated log files. Contributed by Xuan Gong.
This commit is contained in:
parent
a33ce45e35
commit
8528d85a68
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
|
@ -409,9 +410,10 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return false;
|
||||
}
|
||||
|
||||
private List<PerLogFileInfo> getContainerLogFiles(Configuration conf,
|
||||
String containerIdStr, String nodeHttpAddress) throws IOException {
|
||||
List<PerLogFileInfo> logFileInfos = new ArrayList<>();
|
||||
private List<PerContainerLogFileInfo> getContainerLogFiles(
|
||||
Configuration conf, String containerIdStr, String nodeHttpAddress)
|
||||
throws IOException {
|
||||
List<PerContainerLogFileInfo> logFileInfos = new ArrayList<>();
|
||||
Client webServiceClient = Client.create();
|
||||
try {
|
||||
WebResource webResource = webServiceClient
|
||||
|
@ -425,11 +427,20 @@ public class LogsCLI extends Configured implements Tool {
|
|||
ClientResponse.Status.OK.getStatusCode()) {
|
||||
try {
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONArray array = json.getJSONArray("containerLogInfo");
|
||||
JSONArray array = json.getJSONArray("containerLogsInfo");
|
||||
for (int i = 0; i < array.length(); i++) {
|
||||
String fileName = array.getJSONObject(i).getString("fileName");
|
||||
String fileSize = array.getJSONObject(i).getString("fileSize");
|
||||
logFileInfos.add(new PerLogFileInfo(fileName, fileSize));
|
||||
JSONObject log = array.getJSONObject(i);
|
||||
Object ob = log.get("containerLogInfo");
|
||||
if (ob instanceof JSONArray) {
|
||||
JSONArray obArray = (JSONArray)ob;
|
||||
for (int j = 0; j < obArray.length(); j++) {
|
||||
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
|
||||
obArray.getJSONObject(j)));
|
||||
}
|
||||
} else if (ob instanceof JSONObject) {
|
||||
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
|
||||
(JSONObject)ob));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Unable to parse json from webservice. Error:");
|
||||
|
@ -445,6 +456,19 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return logFileInfos;
|
||||
}
|
||||
|
||||
private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON(
|
||||
JSONObject meta) throws JSONException {
|
||||
String fileName = meta.has("fileName") ?
|
||||
meta.getString("fileName") : "N/A";
|
||||
String fileSize = meta.has("fileSize") ?
|
||||
meta.getString("fileSize") : "N/A";
|
||||
String lastModificationTime = meta.has("lastModifiedTime") ?
|
||||
meta.getString("lastModifiedTime") : "N/A";
|
||||
return new PerContainerLogFileInfo(fileName, fileSize,
|
||||
lastModificationTime);
|
||||
}
|
||||
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public int printContainerLogsFromRunningApplication(Configuration conf,
|
||||
|
@ -1160,53 +1184,29 @@ public class LogsCLI extends Configured implements Tool {
|
|||
outStream.println(containerString);
|
||||
outStream.println(StringUtils.repeat("=", containerString.length()));
|
||||
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
|
||||
"LogType", "LogLength");
|
||||
"LogFile", "LogLength", "LastModificationTime");
|
||||
outStream.println(StringUtils.repeat("=", containerString.length()));
|
||||
List<PerLogFileInfo> infos = getContainerLogFiles(
|
||||
List<PerContainerLogFileInfo> infos = getContainerLogFiles(
|
||||
getConf(), containerId, nodeHttpAddress);
|
||||
for (PerLogFileInfo info : infos) {
|
||||
for (PerContainerLogFileInfo info : infos) {
|
||||
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
|
||||
info.getFileName(), info.getFileLength());
|
||||
info.getFileName(), info.getFileSize(),
|
||||
info.getLastModifiedTime());
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static class PerLogFileInfo {
|
||||
private String fileName;
|
||||
private String fileLength;
|
||||
public PerLogFileInfo(String fileName, String fileLength) {
|
||||
setFileName(fileName);
|
||||
setFileLength(fileLength);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public String getFileLength() {
|
||||
return fileLength;
|
||||
}
|
||||
|
||||
public void setFileLength(String fileLength) {
|
||||
this.fileLength = fileLength;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request,
|
||||
boolean useRegex) throws IOException {
|
||||
// fetch all the log files for the container
|
||||
// filter the log files based on the given -log_files pattern
|
||||
List<PerLogFileInfo> allLogFileInfos=
|
||||
List<PerContainerLogFileInfo> allLogFileInfos=
|
||||
getContainerLogFiles(getConf(), request.getContainerId(),
|
||||
request.getNodeHttpAddress());
|
||||
List<String> fileNames = new ArrayList<String>();
|
||||
for (PerLogFileInfo fileInfo : allLogFileInfos) {
|
||||
for (PerContainerLogFileInfo fileInfo : allLogFileInfos) {
|
||||
fileNames.add(fileInfo.getFileName());
|
||||
}
|
||||
return getMatchedLogFiles(request, fileNames,
|
||||
|
|
|
@ -962,7 +962,7 @@ public class AggregatedLogFormat {
|
|||
|
||||
@Private
|
||||
public static Pair<String, String> readContainerMetaDataAndSkipData(
|
||||
DataInputStream valueStream, PrintStream out) throws IOException {
|
||||
DataInputStream valueStream) throws IOException {
|
||||
|
||||
String fileType = valueStream.readUTF();
|
||||
String fileLengthStr = valueStream.readUTF();
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The ContainerLogMeta includes:
|
||||
* <ul>
|
||||
* <li>The Container Id.</li>
|
||||
* <li>The NodeManager Id.</li>
|
||||
* <li>A list of {@link PerContainerLogFileInfo}.</li>
|
||||
* </ul>
|
||||
*
|
||||
*/
|
||||
public class ContainerLogMeta {
|
||||
private String containerId;
|
||||
private String nodeId;
|
||||
private List<PerContainerLogFileInfo> logMeta;
|
||||
|
||||
public ContainerLogMeta(String containerId, String nodeId) {
|
||||
this.containerId = containerId;
|
||||
this.nodeId = nodeId;
|
||||
logMeta = new ArrayList<>();
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
return this.nodeId;
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
return this.containerId;
|
||||
}
|
||||
|
||||
public void addLogMeta(String fileName, String fileSize,
|
||||
String lastModificationTime) {
|
||||
logMeta.add(new PerContainerLogFileInfo(fileName, fileSize,
|
||||
lastModificationTime));
|
||||
}
|
||||
|
||||
public List<PerContainerLogFileInfo> getContainerLogMeta() {
|
||||
return this.logMeta;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
/**
|
||||
* Enumeration of various type of a container log.
|
||||
*/
|
||||
public enum ContainerLogType {
|
||||
|
||||
/** The log is from NodeManager local log directory. */
|
||||
LOCAL,
|
||||
|
||||
/** The log is from Remote FileSystem application log directory. */
|
||||
AGGREGATED
|
||||
}
|
|
@ -20,12 +20,16 @@ package org.apache.hadoop.yarn.logaggregation;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
|
||||
@Private
|
||||
public class LogAggregationUtils {
|
||||
|
@ -77,9 +81,6 @@ public class LogAggregationUtils {
|
|||
return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
|
||||
}
|
||||
|
||||
// TODO Add a utility method to list available log files. Ignore the
|
||||
// temporary ones.
|
||||
|
||||
/**
|
||||
* Gets the remote log user dir.
|
||||
* @param remoteRootLogDir
|
||||
|
@ -115,4 +116,61 @@ public class LogAggregationUtils {
|
|||
public static String getNodeString(String nodeId) {
|
||||
return nodeId.toString().replace(":", "_");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the remote application log directory.
|
||||
* @param conf the configuration
|
||||
* @param appId the application
|
||||
* @param appOwner the application owner
|
||||
* @return the remote application log directory path
|
||||
* @throws IOException if we can not find remote application log directory
|
||||
*/
|
||||
public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
|
||||
Configuration conf, ApplicationId appId, String appOwner)
|
||||
throws IOException {
|
||||
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
||||
org.apache.hadoop.fs.Path remoteRootLogDir =
|
||||
new org.apache.hadoop.fs.Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
org.apache.hadoop.fs.Path remoteAppDir = null;
|
||||
if (appOwner == null) {
|
||||
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
|
||||
FileContext fc = FileContext.getFileContext(
|
||||
qualifiedRemoteRootLogDir.toUri(), conf);
|
||||
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
|
||||
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
|
||||
FileStatus[] matching = fc.util().globStatus(toMatch);
|
||||
if (matching == null || matching.length != 1) {
|
||||
throw new IOException("Can not find remote application directory for "
|
||||
+ "the application:" + appId);
|
||||
}
|
||||
remoteAppDir = matching[0].getPath();
|
||||
} else {
|
||||
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, appOwner, suffix);
|
||||
}
|
||||
return remoteAppDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all available log files under remote app log directory.
|
||||
* @param conf the configuration
|
||||
* @param appId the applicationId
|
||||
* @param appOwner the application owner
|
||||
* @return the iterator of available log files
|
||||
* @throws IOException if there is no log file available
|
||||
*/
|
||||
public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
|
||||
Configuration conf, ApplicationId appId, String appOwner)
|
||||
throws IOException {
|
||||
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner);
|
||||
RemoteIterator<FileStatus> nodeFiles = null;
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
|
||||
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
|
||||
conf).listStatus(remoteAppLogDir);
|
||||
return nodeFiles;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -52,7 +51,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class LogCLIHelpers implements Configurable {
|
||||
|
||||
public static final String PER_LOG_FILE_INFO_PATTERN =
|
||||
"%20s\t%20s" + System.getProperty("line.separator");
|
||||
"%30s\t%30s\t%30s" + System.getProperty("line.separator");
|
||||
public static final String CONTAINER_ON_NODE_PATTERN =
|
||||
"Container: %s on %s";
|
||||
|
||||
|
@ -412,7 +411,8 @@ public class LogCLIHelpers implements Configurable {
|
|||
}
|
||||
}
|
||||
if (!foundAnyLogs) {
|
||||
emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString());
|
||||
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
|
||||
.toString());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -426,67 +426,15 @@ public class LogCLIHelpers implements Configurable {
|
|||
String appOwner = options.getAppOwner();
|
||||
String nodeId = options.getNodeId();
|
||||
String containerIdStr = options.getContainerId();
|
||||
boolean getAllContainers = (containerIdStr == null);
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
|
||||
appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
List<ContainerLogMeta> containersLogMeta;
|
||||
try {
|
||||
containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
|
||||
conf, appId, containerIdStr, nodeId, appOwner);
|
||||
} catch (Exception ex) {
|
||||
err.println(ex.getMessage());
|
||||
return -1;
|
||||
}
|
||||
boolean foundAnyLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
|
||||
key, thisNodeFile.getPath().getName());
|
||||
out.println(containerString);
|
||||
out.println("Log Upload Time:"
|
||||
+ thisNodeFile.getModificationTime());
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogType", "LogLength");
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream, out);
|
||||
out.printf(PER_LOG_FILE_INFO_PATTERN,
|
||||
logMeta.getFirst(), logMeta.getSecond());
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
foundAnyLogs = true;
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundAnyLogs) {
|
||||
if (containersLogMeta.isEmpty()) {
|
||||
if (containerIdStr != null && nodeId != null) {
|
||||
err.println("The container " + containerIdStr + " couldn't be found "
|
||||
+ "on the node specified: " + nodeId);
|
||||
|
@ -499,6 +447,21 @@ public class LogCLIHelpers implements Configurable {
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (ContainerLogMeta containerLogMeta : containersLogMeta) {
|
||||
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
|
||||
containerLogMeta.getContainerId(), containerLogMeta.getNodeId());
|
||||
out.println(containerString);
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
|
||||
"LastModificationTime");
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
for (PerContainerLogFileInfo logMeta : containerLogMeta
|
||||
.getContainerLogMeta()) {
|
||||
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
|
||||
logMeta.getFileSize(), logMeta.getLastModifiedTime());
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -583,36 +546,24 @@ public class LogCLIHelpers implements Configurable {
|
|||
|
||||
private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
|
||||
String appOwner) throws IOException {
|
||||
Path remoteAppLogDir = getRemoteAppLogDir(appId, appOwner);
|
||||
RemoteIterator<FileStatus> nodeFiles = null;
|
||||
try {
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir);
|
||||
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
|
||||
getConf()).listStatus(remoteAppLogDir);
|
||||
nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
|
||||
conf, appId, appOwner);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
logDirNotExist(remoteAppLogDir.toString());
|
||||
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString());
|
||||
} catch (AccessControlException | AccessDeniedException ace) {
|
||||
logDirNoAccessPermission(remoteAppLogDir.toString(), appOwner,
|
||||
ace.getMessage());
|
||||
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString(), appOwner,
|
||||
ace.getMessage());
|
||||
} catch (IOException ioe) {
|
||||
logDirIOError(remoteAppLogDir.toString(), ioe.getMessage());
|
||||
logDirIOError(LogAggregationUtils.getRemoteAppLogDir(
|
||||
conf, appId, appOwner).toString(), ioe.getMessage());
|
||||
}
|
||||
return nodeFiles;
|
||||
}
|
||||
|
||||
private Path getRemoteAppLogDir(ApplicationId appId, String appOwner) {
|
||||
Path remoteRootLogDir = new Path(getConf().get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
String user = appOwner;
|
||||
String logDirSuffix = LogAggregationUtils
|
||||
.getRemoteNodeLogDirSuffix(getConf());
|
||||
// TODO Change this to get a list of files from the LAS.
|
||||
return LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, user, logDirSuffix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
@ -708,7 +659,7 @@ public class LogCLIHelpers implements Configurable {
|
|||
while (true) {
|
||||
try {
|
||||
String logFile = LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream, null).getFirst();
|
||||
valueStream).getFirst();
|
||||
logTypes.add(logFile);
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
/**
|
||||
* This class contains several utility function which could be used in different
|
||||
* log tools.
|
||||
*
|
||||
*/
|
||||
public final class LogToolUtils {
|
||||
|
||||
private LogToolUtils() {}
|
||||
|
||||
/**
|
||||
* Return a list of {@link ContainerLogMeta} for a container
|
||||
* from Remote FileSystem.
|
||||
*
|
||||
* @param conf the configuration
|
||||
* @param appId the applicationId
|
||||
* @param containerIdStr the containerId
|
||||
* @param nodeId the nodeId
|
||||
* @param appOwner the application owner
|
||||
* @return a list of {@link ContainerLogMeta}
|
||||
* @throws IOException if there is no available log file
|
||||
*/
|
||||
public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
|
||||
Configuration conf, ApplicationId appId, String containerIdStr,
|
||||
String nodeId, String appOwner) throws IOException {
|
||||
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
|
||||
boolean getAllContainers = (containerIdStr == null);
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||
.getRemoteNodeFileDir(conf, appId, appOwner);
|
||||
if (nodeFiles == null) {
|
||||
throw new IOException("There is no available log fils for "
|
||||
+ "application:" + appId);
|
||||
}
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (getAllContainers || (key.toString().equals(containerIdStr))) {
|
||||
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
|
||||
key.toString(), thisNodeFile.getPath().getName());
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream);
|
||||
containerLogMeta.addLogMeta(
|
||||
logMeta.getFirst(),
|
||||
logMeta.getSecond(),
|
||||
Times.format(thisNodeFile.getModificationTime()));
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
containersLogMeta.add(containerLogMeta);
|
||||
if (!getAllContainers) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
return containersLogMeta;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
/**
|
||||
* PerContainerLogFileInfo represents the meta data for a container log file,
|
||||
* which includes:
|
||||
* <ul>
|
||||
* <li>The filename of the container log.</li>
|
||||
* <li>The size of the container log.</li>
|
||||
* <li>The last modification time of the container log.</li>
|
||||
* </ul>
|
||||
*
|
||||
*/
|
||||
public class PerContainerLogFileInfo {
|
||||
private String fileName;
|
||||
private String fileSize;
|
||||
private String lastModifiedTime;
|
||||
|
||||
//JAXB needs this
|
||||
public PerContainerLogFileInfo() {}
|
||||
|
||||
public PerContainerLogFileInfo(String fileName, String fileSize,
|
||||
String lastModifiedTime) {
|
||||
this.setFileName(fileName);
|
||||
this.setFileSize(fileSize);
|
||||
this.setLastModifiedTime(lastModifiedTime);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public String getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public void setFileSize(String fileSize) {
|
||||
this.fileSize = fileSize;
|
||||
}
|
||||
|
||||
public String getLastModifiedTime() {
|
||||
return lastModifiedTime;
|
||||
}
|
||||
|
||||
public void setLastModifiedTime(String lastModifiedTime) {
|
||||
this.lastModifiedTime = lastModifiedTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
|
||||
result = prime * result + ((fileSize == null) ? 0 : fileSize.hashCode());
|
||||
result = prime * result + ((lastModifiedTime == null) ?
|
||||
0 : lastModifiedTime.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object otherObj) {
|
||||
if (otherObj == this) {
|
||||
return true;
|
||||
}
|
||||
if (!(otherObj instanceof PerContainerLogFileInfo)) {
|
||||
return false;
|
||||
}
|
||||
PerContainerLogFileInfo other = (PerContainerLogFileInfo)otherObj;
|
||||
return other.fileName.equals(fileName) && other.fileSize.equals(fileSize)
|
||||
&& other.lastModifiedTime.equals(lastModifiedTime);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* This class contains several utility functions for log aggregation tests.
|
||||
*/
|
||||
public final class TestContainerLogsUtils {
|
||||
|
||||
private TestContainerLogsUtils() {}
|
||||
|
||||
/**
|
||||
* Utility function to create container log file and upload
|
||||
* it into remote file system.
|
||||
* @param conf the configuration
|
||||
* @param fs the FileSystem
|
||||
* @param rootLogDir the root log directory
|
||||
* @param containerId the containerId
|
||||
* @param nodeId the nodeId
|
||||
* @param fileName the log file name
|
||||
* @param user the application user
|
||||
* @param content the log context
|
||||
* @param deletePreviousRemoteLogDir whether to delete remote log dir.
|
||||
* @throws IOException if we can not create log files locally
|
||||
* or we can not upload container logs into RemoteFS.
|
||||
*/
|
||||
public static void createContainerLogFileInRemoteFS(Configuration conf,
|
||||
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
|
||||
String fileName, String user, String content,
|
||||
boolean deleteRemoteLogDir) throws IOException {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
||||
//prepare the logs for remote directory
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
// create local logs
|
||||
List<String> rootLogDirList = new ArrayList<String>();
|
||||
rootLogDirList.add(rootLogDir);
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
|
||||
content);
|
||||
// upload container logs to remote log dir
|
||||
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR),
|
||||
user + "/logs/" + appId.toString());
|
||||
if (fs.exists(path) && deleteRemoteLogDir) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
||||
containerId, path, fs);
|
||||
}
|
||||
|
||||
private static void createContainerLogInLocalDir(Path appLogsDir,
|
||||
ContainerId containerId, FileSystem fs, String fileName, String content)
|
||||
throws IOException{
|
||||
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
|
||||
if (fs.exists(containerLogsDir)) {
|
||||
fs.delete(containerLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(containerLogsDir));
|
||||
Writer writer =
|
||||
new FileWriter(new File(containerLogsDir.toString(), fileName));
|
||||
writer.write(content);
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
||||
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
|
||||
Path path =
|
||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
||||
AggregatedLogFormat.LogWriter writer =
|
||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
||||
writer.writeApplicationOwner(ugi.getUserName());
|
||||
|
||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||
ugi.getShortUserName()));
|
||||
writer.close();
|
||||
}
|
||||
}
|
|
@ -23,9 +23,9 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -37,12 +37,12 @@ import javax.ws.rs.Produces;
|
|||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.GenericEntity;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -58,9 +58,11 @@ import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
|
||||
|
@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
|||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -243,7 +246,7 @@ public class AHSWebServices extends WebServices {
|
|||
init(res);
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (Exception e) {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||
}
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
|
@ -253,25 +256,28 @@ public class AHSWebServices extends WebServices {
|
|||
appInfo = super.getApp(req, res, appId.toString());
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return getContainerLogMeta(appId, null, null, containerIdStr);
|
||||
return getContainerLogMeta(appId, null, null, containerIdStr, false);
|
||||
}
|
||||
String appOwner = appInfo.getUser();
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo = super.getContainer(
|
||||
req, res, appId.toString(),
|
||||
containerId.getApplicationAttemptId().toString(),
|
||||
containerId.toString());
|
||||
} catch (Exception ex) {
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
// directly find logs from HDFS.
|
||||
return getContainerLogMeta(appId, appOwner, null, containerIdStr);
|
||||
}
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get ContainerInfo for the container: " + containerId);
|
||||
// if the application finishes, directly find logs
|
||||
// from HDFS.
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
return getContainerLogMeta(appId, null, null,
|
||||
containerIdStr, false);
|
||||
}
|
||||
String nodeId = containerInfo.getNodeId();
|
||||
if (isRunningState(appInfo.getAppState())) {
|
||||
String appOwner = appInfo.getUser();
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo = super.getContainer(
|
||||
req, res, appId.toString(),
|
||||
containerId.getApplicationAttemptId().toString(),
|
||||
containerId.toString());
|
||||
} catch (Exception ex) {
|
||||
// return log meta for the aggregated logs if exists.
|
||||
// It will also return empty log meta for the local logs.
|
||||
return getContainerLogMeta(appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
}
|
||||
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
String uri = "/" + containerId.toString() + "/logs";
|
||||
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
|
||||
|
@ -283,11 +289,8 @@ public class AHSWebServices extends WebServices {
|
|||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else if (isFinishedState(appInfo.getAppState())) {
|
||||
return getContainerLogMeta(appId, appOwner, nodeId,
|
||||
containerIdStr);
|
||||
} else {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
throw new NotFoundException(
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
@ -602,90 +605,39 @@ public class AHSWebServices extends WebServices {
|
|||
}
|
||||
|
||||
private Response getContainerLogMeta(ApplicationId appId, String appOwner,
|
||||
final String nodeId, final String containerIdStr) {
|
||||
Map<String, String> containerLogMeta = new HashMap<>();
|
||||
final String nodeId, final String containerIdStr,
|
||||
boolean emptyLocalContainerLogMeta) {
|
||||
try {
|
||||
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
||||
org.apache.hadoop.fs.Path remoteRootLogDir =
|
||||
new org.apache.hadoop.fs.Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
|
||||
FileContext fc = FileContext.getFileContext(
|
||||
qualifiedRemoteRootLogDir.toUri(), conf);
|
||||
org.apache.hadoop.fs.Path remoteAppDir = null;
|
||||
if (appOwner == null) {
|
||||
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
|
||||
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
|
||||
FileStatus[] matching = fc.util().globStatus(toMatch);
|
||||
if (matching == null || matching.length != 1) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
remoteAppDir = matching[0].getPath();
|
||||
} else {
|
||||
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, appOwner, suffix);
|
||||
}
|
||||
final RemoteIterator<FileStatus> nodeFiles;
|
||||
nodeFiles = fc.listStatus(remoteAppDir);
|
||||
if (!nodeFiles.hasNext()) {
|
||||
List<ContainerLogMeta> containerLogMeta = LogToolUtils
|
||||
.getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
|
||||
nodeId, appOwner);
|
||||
if (containerLogMeta.isEmpty()) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName().endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (key.toString().equals(containerIdStr)) {
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream, null);
|
||||
containerLogMeta.put(logMeta.getFirst(),
|
||||
logMeta.getSecond());
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
||||
for (ContainerLogMeta meta : containerLogMeta) {
|
||||
ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
|
||||
ContainerLogType.AGGREGATED);
|
||||
containersLogsInfo.add(logInfo);
|
||||
}
|
||||
ResponseBuilder response = Response.ok(new ContainerLogsInfo(
|
||||
containerLogMeta));
|
||||
if (emptyLocalContainerLogMeta) {
|
||||
ContainerLogMeta emptyMeta = new ContainerLogMeta(
|
||||
containerIdStr, "N/A");
|
||||
ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
|
||||
ContainerLogType.LOCAL);
|
||||
containersLogsInfo.add(empty);
|
||||
}
|
||||
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
|
||||
ContainerLogsInfo>>(containersLogsInfo){};
|
||||
ResponseBuilder response = Response.ok(meta);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
} catch (Exception ex) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
ex.getMessage());
|
||||
throw new WebApplicationException(ex);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,13 +23,9 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.Writer;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -44,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||
|
@ -57,8 +52,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
|
||||
|
@ -90,6 +86,7 @@ import com.google.inject.Singleton;
|
|||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||
import com.sun.jersey.api.client.GenericType;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
|
@ -507,51 +504,20 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
public void testContainerLogsForFinishedApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
|
||||
//prepare the logs for remote directory
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
// create local logs
|
||||
List<String> rootLogDirList = new ArrayList<String>();
|
||||
rootLogDirList.add(rootLogDir);
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
// create container logs in local log file dir
|
||||
// create two container log files. We can get containerInfo
|
||||
// for container1 from AHS, but can not get such info for
|
||||
// container100
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||
appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
|
||||
("Hello." + containerId1));
|
||||
createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
|
||||
("Hello." + containerId100));
|
||||
|
||||
// upload container logs to remote log dir
|
||||
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
||||
containerId1, path, fs);
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
||||
containerId100, path, fs);
|
||||
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
|
||||
rootLogDir, containerId1, nodeId, fileName, user,
|
||||
("Hello." + containerId1), true);
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
|
||||
rootLogDir, containerId100, nodeId2, fileName, user,
|
||||
("Hello." + containerId100), false);
|
||||
// test whether we can find container log from remote diretory if
|
||||
// the containerInfo for this container could be fetched from AHS.
|
||||
WebResource r = resource();
|
||||
|
@ -600,25 +566,14 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
|
||||
// create an application which can not be found from AHS
|
||||
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
|
||||
appLogsDir = new Path(rootLogDirPath, appId100.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
ApplicationAttemptId appAttemptId100 =
|
||||
ApplicationAttemptId.newInstance(appId100, 1);
|
||||
ContainerId containerId1ForApp100 = ContainerId
|
||||
.newContainerId(appAttemptId100, 1);
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
|
||||
fileName, ("Hello." + containerId1ForApp100));
|
||||
path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId100.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
||||
containerId1ForApp100, path, fs);
|
||||
ApplicationAttemptId appAttemptId100 = ApplicationAttemptId.newInstance(
|
||||
appId100, 1);
|
||||
ContainerId containerId1ForApp100 = ContainerId.newContainerId(
|
||||
appAttemptId100, 1);
|
||||
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
|
||||
rootLogDir, containerId1ForApp100, nodeId, fileName, user,
|
||||
("Hello." + containerId1ForApp100), true);
|
||||
r = resource();
|
||||
response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containerlogs")
|
||||
|
@ -696,35 +651,6 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertEquals(responseText.getBytes().length, fullTextSize);
|
||||
}
|
||||
|
||||
private static void createContainerLogInLocalDir(Path appLogsDir,
|
||||
ContainerId containerId, FileSystem fs, String fileName, String content)
|
||||
throws Exception {
|
||||
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
|
||||
if (fs.exists(containerLogsDir)) {
|
||||
fs.delete(containerLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(containerLogsDir));
|
||||
Writer writer =
|
||||
new FileWriter(new File(containerLogsDir.toString(), fileName));
|
||||
writer.write(content);
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
||||
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
|
||||
Path path =
|
||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
||||
AggregatedLogFormat.LogWriter writer =
|
||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
||||
writer.writeApplicationOwner(ugi.getUserName());
|
||||
|
||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||
ugi.getShortUserName()));
|
||||
writer.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsForRunningApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
|
@ -770,6 +696,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
WebResource r = resource();
|
||||
// If we can get Container information from ATS, we re-direct the request
|
||||
// to the nodemamager who runs the container.
|
||||
URI requestURI = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1.toString()).path("logs")
|
||||
|
@ -780,46 +708,52 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertTrue(redirectURL.contains("ws/v1/node/containers"));
|
||||
assertTrue(redirectURL.contains(containerId1.toString()));
|
||||
assertTrue(redirectURL.contains("/logs"));
|
||||
|
||||
// If we can not container information from ATS, we would try to
|
||||
// get aggregated log meta from remote FileSystem.
|
||||
ContainerId containerId1000 = ContainerId.newContainerId(
|
||||
appAttemptId, 1000);
|
||||
String fileName = "syslog";
|
||||
String content = "Hello." + containerId1000;
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
|
||||
rootLogDir, containerId1000, nodeId, fileName, user, content, true);
|
||||
ClientResponse response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1000.toString()).path("logs")
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
|
||||
List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
|
||||
List<ContainerLogsInfo>>(){});
|
||||
assertTrue(responseText.size() == 2);
|
||||
for (ContainerLogsInfo logInfo : responseText) {
|
||||
if(logInfo.getLogType().equals(ContainerLogType.AGGREGATED.toString())) {
|
||||
List<PerContainerLogFileInfo> logMeta = logInfo
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), fileName);
|
||||
assertEquals(logMeta.get(0).getFileSize(), String.valueOf(
|
||||
content.length()));
|
||||
} else {
|
||||
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsMetaForFinishedApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
//prepare the logs for remote directory
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
// create local logs
|
||||
List<String> rootLogDirList = new ArrayList<String>();
|
||||
rootLogDirList.add(rootLogDir);
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
String content = "Hello." + containerId1;
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
|
||||
content);
|
||||
|
||||
// upload container logs to remote log dir
|
||||
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
||||
containerId1, path, fs);
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
|
||||
rootLogDir, containerId1, nodeId, fileName, user, content, true);
|
||||
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1")
|
||||
|
@ -828,12 +762,16 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
.queryParam("user.name", user)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
ContainerLogsInfo responseText = response.getEntity(
|
||||
ContainerLogsInfo.class);
|
||||
assertEquals(responseText.getContainerLogsInfo().size(), 1);
|
||||
assertEquals(responseText.getContainerLogsInfo().get(0).getFileName(),
|
||||
fileName);
|
||||
assertEquals(responseText.getContainerLogsInfo().get(0).getFileSize(),
|
||||
List<ContainerLogsInfo> responseText = response.getEntity(new GenericType<
|
||||
List<ContainerLogsInfo>>(){});
|
||||
assertTrue(responseText.size() == 1);
|
||||
assertEquals(responseText.get(0).getLogType(),
|
||||
ContainerLogType.AGGREGATED.toString());
|
||||
List<PerContainerLogFileInfo> logMeta = responseText.get(0)
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), fileName);
|
||||
assertEquals(logMeta.get(0).getFileSize(),
|
||||
String.valueOf(content.length()));
|
||||
}
|
||||
|
||||
|
|
|
@ -18,25 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.webapp.dao;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
|
||||
/**
|
||||
* {@code ContainerLogsInfo} includes the log meta-data of containers.
|
||||
* <p>
|
||||
* The container log meta-data includes details such as:
|
||||
* <ul>
|
||||
* <li>The filename of the container log.</li>
|
||||
* <li>The size of the container log.</li>
|
||||
* <li>A list of {@link PerContainerLogFileInfo}.</li>
|
||||
* <li>The container Id.</li>
|
||||
* <li>The NodeManager Id.</li>
|
||||
* <li>The logType: could be local or aggregated</li>
|
||||
* </ul>
|
||||
*/
|
||||
|
||||
|
@ -45,57 +46,42 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
public class ContainerLogsInfo {
|
||||
|
||||
@XmlElement(name = "containerLogInfo")
|
||||
protected List<ContainerLogInfo> containerLogsInfo;
|
||||
protected List<PerContainerLogFileInfo> containerLogsInfo;
|
||||
|
||||
@XmlElement(name = "logType")
|
||||
protected String logType;
|
||||
|
||||
@XmlElement(name = "containerId")
|
||||
protected String containerId;
|
||||
|
||||
@XmlElement(name = "nodeId")
|
||||
protected String nodeId;
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogsInfo() {}
|
||||
|
||||
public ContainerLogsInfo(Map<String, String> containerLogMeta)
|
||||
public ContainerLogsInfo(ContainerLogMeta logMeta, ContainerLogType logType)
|
||||
throws YarnException {
|
||||
this.containerLogsInfo = new ArrayList<ContainerLogInfo>();
|
||||
for (Entry<String, String> meta : containerLogMeta.entrySet()) {
|
||||
ContainerLogInfo info = new ContainerLogInfo(meta.getKey(),
|
||||
meta.getValue());
|
||||
containerLogsInfo.add(info);
|
||||
}
|
||||
this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
|
||||
logMeta.getContainerLogMeta());
|
||||
this.logType = logType.toString();
|
||||
this.containerId = logMeta.getContainerId();
|
||||
this.nodeId = logMeta.getNodeId();
|
||||
}
|
||||
|
||||
public List<ContainerLogInfo> getContainerLogsInfo() {
|
||||
public List<PerContainerLogFileInfo> getContainerLogsInfo() {
|
||||
return this.containerLogsInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* It includes the log meta-data of a container.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static class ContainerLogInfo {
|
||||
private String fileName;
|
||||
private String fileSize;
|
||||
public String getLogType() {
|
||||
return this.logType;
|
||||
}
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogInfo() {}
|
||||
public String getContainerId() {
|
||||
return this.containerId;
|
||||
}
|
||||
|
||||
public ContainerLogInfo(String fileName, String fileSize) {
|
||||
this.setFileName(fileName);
|
||||
this.setFileSize(fileSize);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public String getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public void setFileSize(String fileSize) {
|
||||
this.fileSize = fileSize;
|
||||
}
|
||||
public String getNodeId() {
|
||||
return this.nodeId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -31,6 +33,7 @@ import javax.ws.rs.PathParam;
|
|||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.GenericEntity;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
|
@ -46,6 +49,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -54,9 +60,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||
|
@ -207,6 +214,8 @@ public class NMWebServices {
|
|||
*
|
||||
* @param hsr
|
||||
* HttpServletRequest
|
||||
* @param res
|
||||
* HttpServletResponse
|
||||
* @param containerIdStr
|
||||
* The container ID
|
||||
* @return
|
||||
|
@ -216,20 +225,53 @@ public class NMWebServices {
|
|||
@Path("/containers/{containerid}/logs")
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
public ContainerLogsInfo getContainerLogsInfo(@javax.ws.rs.core.Context
|
||||
HttpServletRequest hsr,
|
||||
public Response getContainerLogsInfo(
|
||||
@javax.ws.rs.core.Context HttpServletRequest hsr,
|
||||
@javax.ws.rs.core.Context HttpServletResponse res,
|
||||
@PathParam("containerid") String containerIdStr) {
|
||||
ContainerId containerId = null;
|
||||
init();
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (Exception e) {
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||
}
|
||||
|
||||
try {
|
||||
return new ContainerLogsInfo(this.nmContext, containerId,
|
||||
hsr.getRemoteUser());
|
||||
} catch (YarnException ex) {
|
||||
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
||||
containersLogsInfo.add(new NMContainerLogsInfo(
|
||||
this.nmContext, containerId,
|
||||
hsr.getRemoteUser(), ContainerLogType.LOCAL));
|
||||
// check whether we have aggregated logs in RemoteFS. If exists, show the
|
||||
// the log meta for the aggregated logs as well.
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
Application app = this.nmContext.getApplications().get(appId);
|
||||
String appOwner = app == null ? null : app.getUser();
|
||||
try {
|
||||
List<ContainerLogMeta> containerLogMeta = LogToolUtils
|
||||
.getContainerLogMetaFromRemoteFS(this.nmContext.getConf(),
|
||||
appId, containerIdStr,
|
||||
this.nmContext.getNodeId().toString(), appOwner);
|
||||
if (!containerLogMeta.isEmpty()) {
|
||||
for (ContainerLogMeta logMeta : containerLogMeta) {
|
||||
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
|
||||
ContainerLogType.AGGREGATED));
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// Something wrong with we tries to access the remote fs for the logs.
|
||||
// Skip it and do nothing
|
||||
}
|
||||
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
|
||||
ContainerLogsInfo>>(containersLogsInfo){};
|
||||
ResponseBuilder resp = Response.ok(meta);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
resp.header("X-Content-Type-Options", "nosniff");
|
||||
return resp.build();
|
||||
} catch (Exception ex) {
|
||||
throw new WebApplicationException(ex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,47 +23,42 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
/**
|
||||
* {@code ContainerLogsInfo} includes the log meta-data of containers.
|
||||
* <p>
|
||||
* The container log meta-data includes details such as:
|
||||
* <ul>
|
||||
* <li>The filename of the container log.</li>
|
||||
* <li>The size of the container log.</li>
|
||||
* </ul>
|
||||
* NMContainerLogsInfo represents the meta data for container logs
|
||||
* which exist in NM local log directory.
|
||||
* This class extends {@link ContainerLogsInfo}.
|
||||
*/
|
||||
|
||||
@XmlRootElement(name = "containerLogsInfo")
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class ContainerLogsInfo {
|
||||
|
||||
@XmlElement(name = "containerLogInfo")
|
||||
protected List<ContainerLogInfo> containerLogsInfo;
|
||||
public class NMContainerLogsInfo extends ContainerLogsInfo {
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogsInfo() {}
|
||||
public NMContainerLogsInfo() {}
|
||||
|
||||
public ContainerLogsInfo(final Context nmContext,
|
||||
final ContainerId containerId, String remoteUser)
|
||||
throws YarnException {
|
||||
public NMContainerLogsInfo(final Context nmContext,
|
||||
final ContainerId containerId, String remoteUser,
|
||||
ContainerLogType logType) throws YarnException {
|
||||
this.logType = logType.toString();
|
||||
this.containerId = containerId.toString();
|
||||
this.nodeId = nmContext.getNodeId().toString();
|
||||
this.containerLogsInfo = getContainerLogsInfo(
|
||||
containerId, remoteUser, nmContext);
|
||||
}
|
||||
|
||||
public List<ContainerLogInfo> getContainerLogsInfo() {
|
||||
return this.containerLogsInfo;
|
||||
}
|
||||
|
||||
private static List<ContainerLogInfo> getContainerLogsInfo(ContainerId id,
|
||||
String remoteUser, Context nmContext) throws YarnException {
|
||||
List<ContainerLogInfo> logFiles = new ArrayList<ContainerLogInfo>();
|
||||
private static List<PerContainerLogFileInfo> getContainerLogsInfo(
|
||||
ContainerId id, String remoteUser, Context nmContext)
|
||||
throws YarnException {
|
||||
List<PerContainerLogFileInfo> logFiles = new ArrayList<>();
|
||||
List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
|
||||
id, remoteUser, nmContext);
|
||||
for (File containerLogsDir : logDirs) {
|
||||
|
@ -71,8 +66,9 @@ public class ContainerLogsInfo {
|
|||
if (logs != null) {
|
||||
for (File log : logs) {
|
||||
if (log.isFile()) {
|
||||
ContainerLogInfo logMeta = new ContainerLogInfo(
|
||||
log.getName(), log.length());
|
||||
PerContainerLogFileInfo logMeta = new PerContainerLogFileInfo(
|
||||
log.getName(), Long.toString(log.length()),
|
||||
Times.format(log.lastModified()));
|
||||
logFiles.add(logMeta);
|
||||
}
|
||||
}
|
||||
|
@ -80,33 +76,4 @@ public class ContainerLogsInfo {
|
|||
}
|
||||
return logFiles;
|
||||
}
|
||||
|
||||
private static class ContainerLogInfo {
|
||||
private String fileName;
|
||||
private long fileSize;
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogInfo() {}
|
||||
|
||||
public ContainerLogInfo(String fileName, long fileSize) {
|
||||
this.setFileName(fileName);
|
||||
this.setFileSize(fileSize);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public long getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public void setFileSize(long fileSize) {
|
||||
this.fileSize = fileSize;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringReader;
|
||||
import java.util.List;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
|
@ -34,6 +35,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
|
|||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
@ -43,6 +45,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
|
||||
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
|
@ -54,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
|
|||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
|
@ -75,6 +81,7 @@ import com.google.inject.Guice;
|
|||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||
import com.sun.jersey.api.client.GenericType;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
|
@ -95,6 +102,8 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
TestNMWebServices.class.getSimpleName());
|
||||
private static File testLogDir = new File("target",
|
||||
TestNMWebServices.class.getSimpleName() + "LogDir");
|
||||
private static File testRemoteLogDir = new File("target",
|
||||
TestNMWebServices.class.getSimpleName() + "remote-log-dir");
|
||||
|
||||
private static class WebServletModule extends ServletModule {
|
||||
|
||||
|
@ -103,6 +112,9 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
testRemoteLogDir.getAbsolutePath());
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
|
@ -160,6 +172,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
testRemoteLogDir.mkdir();
|
||||
testRootDir.mkdirs();
|
||||
testLogDir.mkdir();
|
||||
GuiceServletConfig.setInjector(
|
||||
|
@ -170,6 +183,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
static public void stop() {
|
||||
FileUtil.fullyDelete(testRootDir);
|
||||
FileUtil.fullyDelete(testLogDir);
|
||||
FileUtil.fullyDelete(testRemoteLogDir);
|
||||
}
|
||||
|
||||
public TestNMWebServices() {
|
||||
|
@ -338,7 +352,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
}
|
||||
|
||||
private void testContainerLogs(WebResource r, ContainerId containerId)
|
||||
throws IOException, JSONException {
|
||||
throws IOException {
|
||||
final String containerIdStr = containerId.toString();
|
||||
final ApplicationAttemptId appAttemptId = containerId
|
||||
.getApplicationAttemptId();
|
||||
|
@ -446,10 +460,50 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
.path("logs").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(200, response.getStatus());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
assertEquals(json.getJSONObject("containerLogInfo")
|
||||
.getString("fileName"), filename);
|
||||
List<ContainerLogsInfo> responseList = response.getEntity(new GenericType<
|
||||
List<ContainerLogsInfo>>(){});
|
||||
assertTrue(responseList.size() == 1);
|
||||
assertEquals(responseList.get(0).getLogType(),
|
||||
ContainerLogType.LOCAL.toString());
|
||||
List<PerContainerLogFileInfo> logMeta = responseList.get(0)
|
||||
.getContainerLogsInfo();
|
||||
assertTrue(logMeta.size() == 1);
|
||||
assertEquals(logMeta.get(0).getFileName(), filename);
|
||||
|
||||
// now create an aggregated log in Remote File system
|
||||
File tempLogDir = new File("target",
|
||||
TestNMWebServices.class.getSimpleName() + "temp-log-dir");
|
||||
try {
|
||||
String aggregatedLogFile = filename + "-aggregated";
|
||||
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
|
||||
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
|
||||
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
|
||||
aggregatedLogFile, "user", logMessage, true);
|
||||
r1 = resource();
|
||||
response = r1.path("ws").path("v1").path("node")
|
||||
.path("containers").path(containerIdStr)
|
||||
.path("logs").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(200, response.getStatus());
|
||||
responseList = response.getEntity(new GenericType<
|
||||
List<ContainerLogsInfo>>(){});
|
||||
assertEquals(responseList.size(), 2);
|
||||
for (ContainerLogsInfo logInfo : responseList) {
|
||||
if(logInfo.getLogType().equals(
|
||||
ContainerLogType.AGGREGATED.toString())) {
|
||||
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
assertTrue(meta.size() == 1);
|
||||
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
|
||||
} else {
|
||||
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
|
||||
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
|
||||
assertTrue(meta.size() == 1);
|
||||
assertEquals(meta.get(0).getFileName(), filename);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
FileUtil.fullyDelete(tempLogDir);
|
||||
}
|
||||
// After container is completed, it is removed from nmContext
|
||||
nmContext.getContainers().remove(containerId);
|
||||
Assert.assertNull(nmContext.getContainers().get(containerId));
|
||||
|
|
Loading…
Reference in New Issue