YARN-6174. Log files pattern should be same for both running and finished container. Contributed by Xuan Gong.

(cherry picked from commit ce2d5bfa5f)
This commit is contained in:
Junping Du 2017-02-15 09:05:14 -08:00
parent a8531d5d52
commit 2aa6f317f6
8 changed files with 82 additions and 41 deletions

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
/**
* Enumeration of various aggregation type of a container log.
*/
public enum ContainerLogAggregationType {
/** The log is from NodeManager local log directory. */
LOCAL,
/** The log is from Remote FileSystem application log directory. */
AGGREGATED
}

View File

@ -141,7 +141,7 @@ public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
public static void outputContainerLog(String containerId, String nodeId,
String fileName, long fileLength, long outputSize,
String lastModifiedTime, InputStream fis, OutputStream os,
byte[] buf, ContainerLogType logType) throws IOException {
byte[] buf, ContainerLogAggregationType logType) throws IOException {
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
@ -171,9 +171,9 @@ public static void outputContainerLog(String containerId, String nodeId,
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n");
sb.append("LogType: " + logType + "\n");
sb.append("LogAggregationType: " + logType + "\n");
sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
sb.append("FileName:" + fileName + "\n");
sb.append("LogType:" + fileName + "\n");
sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
sb.append("LogLength:" + Long.toString(fileLength) + "\n");
sb.append("LogContents:\n");
@ -240,9 +240,10 @@ public static boolean outputAggregatedContainerLog(Configuration conf,
LogToolUtils.outputContainerLog(containerId,
nodeId, fileType, fileLength, outputSize,
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf, ContainerLogType.AGGREGATED);
valueStream, os, buf,
ContainerLogAggregationType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogFile:" + fileType;
String endOfFile = "End of LogType:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");

View File

@ -54,7 +54,7 @@
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
@ -501,13 +501,14 @@ public void write(OutputStream os) throws IOException,
boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
if (!findLogs) {
throw new IOException("Can not find logs for container:"
+ containerIdStr);
os.write(("Can not find logs for container:"
+ containerIdStr).getBytes(Charset.forName("UTF-8")));
} else {
if (printEmptyLocalContainerLog) {
StringBuilder sb = new StringBuilder();
sb.append(containerIdStr + "\n");
sb.append("LogType: " + ContainerLogType.LOCAL + "\n");
sb.append("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL + "\n");
sb.append("LogContents:\n");
sb.append(getNoRedirectWarning() + "\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
@ -539,14 +540,14 @@ private Response getContainerLogMeta(ApplicationId appId, String appOwner,
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
for (ContainerLogMeta meta : containerLogMeta) {
ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
ContainerLogType.AGGREGATED);
ContainerLogAggregationType.AGGREGATED);
containersLogsInfo.add(logInfo);
}
if (emptyLocalContainerLogMeta) {
ContainerLogMeta emptyMeta = new ContainerLogMeta(
containerIdStr, "N/A");
ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
ContainerLogType.LOCAL);
ContainerLogAggregationType.LOCAL);
containersLogsInfo.add(empty);
}
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<

View File

@ -50,7 +50,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
@ -595,8 +595,8 @@ public void testContainerLogsForFinishedApps() throws Exception {
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length;
String tailEndSeparator = StringUtils.repeat("*",
"End of LogFile:syslog".length() + 50) + "\n\n";
int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length
"End of LogType:syslog".length() + 50) + "\n\n";
int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length
+ tailEndSeparator.getBytes().length;
String logMessage = "Hello." + containerId1ForApp100;
@ -748,7 +748,8 @@ public void testContainerLogsForRunningApps() throws Exception {
assertTrue(responseText.contains(content));
// Also test whether we output the empty local container log, and give
// the warning message.
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
// If we can not container information from ATS, and we specify the NM id,
@ -763,7 +764,8 @@ public void testContainerLogsForRunningApps() throws Exception {
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
}
@ -826,7 +828,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
List<ContainerLogsInfo>>(){});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(ContainerLogType.AGGREGATED.toString())) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
@ -834,7 +837,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
assertEquals(logMeta.get(0).getFileSize(), String.valueOf(
content.length()));
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
}
}
@ -852,7 +856,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
List<ContainerLogsInfo>>(){});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(ContainerLogType.AGGREGATED.toString())) {
if(logInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
@ -860,7 +865,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
assertEquals(logMeta.get(0).getFileSize(), String.valueOf(
content.length()));
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
}
}
}
@ -889,7 +895,7 @@ public void testContainerLogsMetaForFinishedApps() throws Exception {
List<ContainerLogsInfo>>(){});
assertTrue(responseText.size() == 1);
assertEquals(responseText.get(0).getLogType(),
ContainerLogType.AGGREGATED.toString());
ContainerLogAggregationType.AGGREGATED.toString());
List<PerContainerLogFileInfo> logMeta = responseText.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);

View File

@ -26,7 +26,7 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
/**
@ -48,7 +48,7 @@ public class ContainerLogsInfo {
@XmlElement(name = "containerLogInfo")
protected List<PerContainerLogFileInfo> containerLogsInfo;
@XmlElement(name = "logType")
@XmlElement(name = "logAggregationType")
protected String logType;
@XmlElement(name = "containerId")
@ -60,8 +60,8 @@ public class ContainerLogsInfo {
//JAXB needs this
public ContainerLogsInfo() {}
public ContainerLogsInfo(ContainerLogMeta logMeta, ContainerLogType logType)
throws YarnException {
public ContainerLogsInfo(ContainerLogMeta logMeta,
ContainerLogAggregationType logType) throws YarnException {
this.containerLogsInfo = new ArrayList<PerContainerLogFileInfo>(
logMeta.getContainerLogMeta());
this.logType = logType.toString();

View File

@ -54,7 +54,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
@ -244,7 +244,7 @@ public Response getContainerLogsInfo(
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
containersLogsInfo.add(new NMContainerLogsInfo(
this.nmContext, containerId,
hsr.getRemoteUser(), ContainerLogType.LOCAL));
hsr.getRemoteUser(), ContainerLogAggregationType.LOCAL));
// check whether we have aggregated logs in RemoteFS. If exists, show the
// the log meta for the aggregated logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
@ -259,7 +259,7 @@ public Response getContainerLogsInfo(
if (!containerLogMeta.isEmpty()) {
for (ContainerLogMeta logMeta : containerLogMeta) {
containersLogsInfo.add(new ContainerLogsInfo(logMeta,
ContainerLogType.AGGREGATED));
ContainerLogAggregationType.AGGREGATED));
}
}
} catch (IOException ex) {
@ -421,9 +421,10 @@ public void write(OutputStream os) throws IOException,
byte[] buf = new byte[bufferSize];
LogToolUtils.outputContainerLog(containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, fileLength,
bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL);
bytes, lastModifiedTime, fis, os, buf,
ContainerLogAggregationType.LOCAL);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogFile:" + outputFileName;
String endOfFile = "End of LogType:" + outputFileName;
sb.append(endOfFile + ".");
if (isRunning) {
sb.append("This log file belongs to a running container ("

View File

@ -26,7 +26,7 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
@ -47,7 +47,7 @@ public NMContainerLogsInfo() {}
public NMContainerLogsInfo(final Context nmContext,
final ContainerId containerId, String remoteUser,
ContainerLogType logType) throws YarnException {
ContainerLogAggregationType logType) throws YarnException {
this.logType = logType.toString();
this.containerId = containerId.toString();
this.nodeId = nmContext.getNodeId().toString();

View File

@ -48,7 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -522,7 +522,7 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
List<ContainerLogsInfo>>(){});
assertTrue(responseList.size() == 1);
assertEquals(responseList.get(0).getLogType(),
ContainerLogType.LOCAL.toString());
ContainerLogAggregationType.LOCAL.toString());
List<PerContainerLogFileInfo> logMeta = responseList.get(0)
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
@ -549,12 +549,13 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
assertEquals(responseList.size(), 2);
for (ContainerLogsInfo logInfo : responseList) {
if(logInfo.getLogType().equals(
ContainerLogType.AGGREGATED.toString())) {
ContainerLogAggregationType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), aggregatedLogFile);
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
assertEquals(logInfo.getLogType(),
ContainerLogAggregationType.LOCAL.toString());
List<PerContainerLogFileInfo> meta = logInfo.getContainerLogsInfo();
assertTrue(meta.size() == 1);
assertEquals(meta.get(0).getFileName(), filename);
@ -569,11 +570,11 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.AGGREGATED));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.AGGREGATED));
assertTrue(responseText.contains(aggregatedLogMessage));
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.LOCAL));
assertTrue(responseText.contains("LogAggregationType: "
+ ContainerLogAggregationType.LOCAL));
assertTrue(responseText.contains(logMessage));
} finally {
FileUtil.fullyDelete(tempLogDir);
@ -676,7 +677,7 @@ public void verifyNodeInfoGeneric(String id, String healthReport,
private String getLogContext(String fullMessage) {
String prefix = "LogContents:\n";
String postfix = "End of LogFile:";
String postfix = "End of LogType:";
int prefixIndex = fullMessage.indexOf(prefix) + prefix.length();
int postfixIndex = fullMessage.indexOf(postfix);
return fullMessage.substring(prefixIndex, postfixIndex);