YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong.

(cherry picked from commit 15ccd967ee)
This commit is contained in:
Junping Du 2015-05-14 10:57:36 -07:00
parent 454236ec19
commit bc13c7d84b
19 changed files with 471 additions and 281 deletions

View File

@ -63,6 +63,9 @@ Release 2.8.0 - UNRELEASED
YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
(Jonathan Eagles via zjshen) (Jonathan Eagles via zjshen)
YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in
RMApps. (Xuan Gong via junping_du)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -34,6 +34,8 @@ public enum LogAggregationStatus {
/** Log Aggregation is Running. */ /** Log Aggregation is Running. */
RUNNING, RUNNING,
/** Log Aggregation is Running, but has failures in previous cycles. */
RUNNING_WITH_FAILURE,
/** /**
* Log Aggregation is Succeeded. All of the logs have been aggregated * Log Aggregation is Succeeded. All of the logs have been aggregated
* successfully. * successfully.

View File

@ -718,6 +718,16 @@ public class YarnConfiguration extends Configuration {
+ "proxy-user-privileges.enabled"; + "proxy-user-privileges.enabled";
public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false; public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
/**
* How many diagnostics/failure messages can be saved in RM for
* log aggregation. It also defines the number of diagnostics/failure
* messages can be shown in log aggregation web ui.
*/
public static final String RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
RM_PREFIX + "max-log-aggregation-diagnostics-in-memory";
public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
10;
/** Whether to enable log aggregation */ /** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable"; + "log-aggregation-enable";

View File

@ -204,6 +204,7 @@ enum LogAggregationStatusProto {
LOG_SUCCEEDED = 4; LOG_SUCCEEDED = 4;
LOG_FAILED = 5; LOG_FAILED = 5;
LOG_TIME_OUT = 6; LOG_TIME_OUT = 6;
LOG_RUNNING_WITH_FAILURE = 7;
} }
message ApplicationAttemptReportProto { message ApplicationAttemptReportProto {

View File

@ -674,6 +674,14 @@
<value>10</value> <value>10</value>
</property> </property>
<property>
<description>Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure
messages can be shown in log aggregation web ui.</description>
<name>yarn.resourcemanager.max-log-aggregation-diagnostics-in-memory</name>
<value>10</value>
</property>
<!-- Node Manager Configs --> <!-- Node Manager Configs -->
<property> <property>
<description>The hostname of the NM.</description> <description>The hostname of the NM.</description>

View File

@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.util.Records;
* It includes details such as: * It includes details such as:
* <ul> * <ul>
* <li>{@link ApplicationId} of the application.</li> * <li>{@link ApplicationId} of the application.</li>
* <li>{@link NodeId} of the NodeManager.</li>
* <li>{@link LogAggregationStatus}</li> * <li>{@link LogAggregationStatus}</li>
* <li>Diagnostic information</li> * <li>Diagnostic information</li>
* </ul> * </ul>
@ -45,7 +43,7 @@ public abstract class LogAggregationReport {
@Public @Public
@Unstable @Unstable
public static LogAggregationReport newInstance(ApplicationId appId, public static LogAggregationReport newInstance(ApplicationId appId,
NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) { LogAggregationStatus status, String diagnosticMessage) {
LogAggregationReport report = Records.newRecord(LogAggregationReport.class); LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId); report.setApplicationId(appId);
report.setLogAggregationStatus(status); report.setLogAggregationStatus(status);
@ -65,18 +63,6 @@ public abstract class LogAggregationReport {
@Unstable @Unstable
public abstract void setApplicationId(ApplicationId appId); public abstract void setApplicationId(ApplicationId appId);
/**
* Get the <code>NodeId</code>.
* @return <code>NodeId</code>
*/
@Public
@Unstable
public abstract NodeId getNodeId();
@Public
@Unstable
public abstract void setNodeId(NodeId nodeId);
/** /**
* Get the <code>LogAggregationStatus</code>. * Get the <code>LogAggregationStatus</code>.
* @return <code>LogAggregationStatus</code> * @return <code>LogAggregationStatus</code>

View File

@ -18,10 +18,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords; package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.Map; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -54,9 +53,9 @@ public abstract class NodeHeartbeatRequest {
public abstract Set<String> getNodeLabels(); public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels); public abstract void setNodeLabels(Set<String> nodeLabels);
public abstract Map<ApplicationId, LogAggregationReport> public abstract List<LogAggregationReport>
getLogAggregationReportsForApps(); getLogAggregationReportsForApps();
public abstract void setLogAggregationReportsForApps( public abstract void setLogAggregationReportsForApps(
Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps); List<LogAggregationReport> logAggregationReportsForApps);
} }

View File

@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@ -45,7 +42,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
boolean viaProto = false; boolean viaProto = false;
private ApplicationId applicationId; private ApplicationId applicationId;
private NodeId nodeId;
public LogAggregationReportPBImpl() { public LogAggregationReportPBImpl() {
builder = LogAggregationReportProto.newBuilder(); builder = LogAggregationReportProto.newBuilder();
@ -89,12 +85,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
builder.getApplicationId())) { builder.getApplicationId())) {
builder.setApplicationId(convertToProtoFormat(this.applicationId)); builder.setApplicationId(convertToProtoFormat(this.applicationId));
} }
if (this.nodeId != null
&& !((NodeIdPBImpl) this.nodeId).getProto().equals(
builder.getNodeId())) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -191,34 +181,4 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
} }
builder.setDiagnostics(diagnosticMessage); builder.setDiagnostics(diagnosticMessage);
} }
@Override
public NodeId getNodeId() {
if (this.nodeId != null) {
return this.nodeId;
}
LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeId()) {
return null;
}
this.nodeId = convertFromProtoFormat(p.getNodeId());
return this.nodeId;
}
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
builder.clearNodeId();
this.nodeId = nodeId;
}
private NodeIdProto convertToProtoFormat(NodeId t) {
return ((NodeIdPBImpl) t).getProto();
}
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
return new NodeIdPBImpl(nodeId);
}
} }

View File

@ -18,21 +18,16 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.HashMap; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@ -51,9 +46,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null; private Set<String> labels = null;
private Map<ApplicationId, LogAggregationReport> private List<LogAggregationReport> logAggregationReportsForApps = null;
logAggregationReportsForApps = null;
public NodeHeartbeatRequestPBImpl() { public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder(); builder = NodeHeartbeatRequestProto.newBuilder();
} }
@ -110,12 +104,35 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void addLogAggregationStatusForAppsToProto() { private void addLogAggregationStatusForAppsToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearLogAggregationReportsForApps(); builder.clearLogAggregationReportsForApps();
for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps if (this.logAggregationReportsForApps == null) {
.entrySet()) { return;
builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
.newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
.setLogAggregationReport(convertToProtoFormat(entry.getValue())));
} }
Iterable<LogAggregationReportProto> it =
new Iterable<LogAggregationReportProto>() {
@Override
public Iterator<LogAggregationReportProto> iterator() {
return new Iterator<LogAggregationReportProto>() {
private Iterator<LogAggregationReport> iter =
logAggregationReportsForApps.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public LogAggregationReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllLogAggregationReportsForApps(it);
} }
private LogAggregationReportProto convertToProtoFormat( private LogAggregationReportProto convertToProtoFormat(
@ -246,17 +263,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
labels = new HashSet<String>(nodeLabels.getElementsList()); labels = new HashSet<String>(nodeLabels.getElementsList());
} }
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
@Override @Override
public Map<ApplicationId, LogAggregationReport> public List<LogAggregationReport> getLogAggregationReportsForApps() {
getLogAggregationReportsForApps() {
if (this.logAggregationReportsForApps != null) { if (this.logAggregationReportsForApps != null) {
return this.logAggregationReportsForApps; return this.logAggregationReportsForApps;
} }
@ -266,15 +274,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void initLogAggregationReportsForApps() { private void initLogAggregationReportsForApps() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
List<LogAggregationReportsForAppsProto> list = List<LogAggregationReportProto> list =
p.getLogAggregationReportsForAppsList(); p.getLogAggregationReportsForAppsList();
this.logAggregationReportsForApps = this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
new HashMap<ApplicationId, LogAggregationReport>(); for (LogAggregationReportProto c : list) {
for (LogAggregationReportsForAppsProto c : list) { this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
ApplicationId appId = convertFromProtoFormat(c.getAppId());
LogAggregationReport report =
convertFromProtoFormat(c.getLogAggregationReport());
this.logAggregationReportsForApps.put(appId, report);
} }
} }
@ -285,14 +289,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
@Override @Override
public void setLogAggregationReportsForApps( public void setLogAggregationReportsForApps(
Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) { List<LogAggregationReport> logAggregationStatusForApps) {
if (logAggregationStatusForApps == null if(logAggregationStatusForApps == null) {
|| logAggregationStatusForApps.isEmpty()) { builder.clearLogAggregationReportsForApps();
return;
} }
maybeInitBuilder(); this.logAggregationReportsForApps = logAggregationStatusForApps;
this.logAggregationReportsForApps =
new HashMap<ApplicationId, LogAggregationReport>();
this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
@ -192,8 +193,17 @@ public class AppBlock extends HtmlBlock {
: "ApplicationMaster"); : "ApplicationMaster");
if (webUiType != null if (webUiType != null
&& webUiType.equals(YarnWebParams.RM_WEB_UI)) { && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
overviewTable._("Log Aggregation Status", LogAggregationStatus status = getLogAggregationStatus();
root_url("logaggregationstatus", app.getAppId()), "Status"); if (status == null) {
overviewTable._("Log Aggregation Status", "N/A");
} else if (status == LogAggregationStatus.DISABLED
|| status == LogAggregationStatus.NOT_START
|| status == LogAggregationStatus.SUCCEEDED) {
overviewTable._("Log Aggregation Status", status.name());
} else {
overviewTable._("Log Aggregation Status",
root_url("logaggregationstatus", app.getAppId()), status.name());
}
} }
overviewTable._("Diagnostics:", overviewTable._("Diagnostics:",
app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo()); app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
@ -342,4 +352,9 @@ public class AppBlock extends HtmlBlock {
protected void createApplicationMetricsTable(Block html) { protected void createApplicationMetricsTable(Block html) {
} }
// This will be overrided in RMAppBlock
protected LogAggregationStatus getLogAggregationStatus() {
return null;
}
} }

View File

@ -50,19 +50,13 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3; optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4; optional StringArrayProto nodeLabels = 4;
repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
}
message LogAggregationReportsForAppsProto {
optional ApplicationIdProto appId = 1;
optional LogAggregationReportProto log_aggregation_report = 2;
} }
message LogAggregationReportProto { message LogAggregationReportProto {
optional ApplicationIdProto application_id = 1; optional ApplicationIdProto application_id = 1;
optional NodeIdProto node_id = 2; optional LogAggregationStatusProto log_aggregation_status = 2;
optional LogAggregationStatusProto log_aggregation_status = 3; optional string diagnostics = 3 [default = "N/A"];
optional string diagnostics = 4 [default = "N/A"];
} }
message NodeHeartbeatResponseProto { message NodeHeartbeatResponseProto {

View File

@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -666,7 +665,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (logAggregationEnabled) { if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM // pull log aggregation status for application running in this NM
Map<ApplicationId, LogAggregationReport> logAggregationReports = List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps()); .getLogAggregationStatusForApps());
if (logAggregationReports != null if (logAggregationReports != null
@ -810,47 +809,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
statusUpdater.start(); statusUpdater.start();
} }
private Map<ApplicationId, LogAggregationReport> private List<LogAggregationReport> getLogAggregationReportsForApps(
getLogAggregationReportsForApps( ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
new HashMap<ApplicationId, LogAggregationReport>();
LogAggregationReport status; LogAggregationReport status;
while ((status = lastestLogAggregationStatus.poll()) != null) { while ((status = lastestLogAggregationStatus.poll()) != null) {
this.logAggregationReportForAppsTempList.add(status); this.logAggregationReportForAppsTempList.add(status);
} }
for (LogAggregationReport logAggregationReport List<LogAggregationReport> reports = new ArrayList<LogAggregationReport>();
: this.logAggregationReportForAppsTempList) { reports.addAll(logAggregationReportForAppsTempList);
LogAggregationReport report = null; return reports;
if (latestLogAggregationReports.containsKey(logAggregationReport
.getApplicationId())) {
report =
latestLogAggregationReports.get(logAggregationReport
.getApplicationId());
report.setLogAggregationStatus(logAggregationReport
.getLogAggregationStatus());
String message = report.getDiagnosticMessage();
if (logAggregationReport.getDiagnosticMessage() != null
&& !logAggregationReport.getDiagnosticMessage().isEmpty()) {
if (message != null) {
message += logAggregationReport.getDiagnosticMessage();
} else {
message = logAggregationReport.getDiagnosticMessage();
}
report.setDiagnosticMessage(message);
}
} else {
report = Records.newRecord(LogAggregationReport.class);
report.setApplicationId(logAggregationReport.getApplicationId());
report.setNodeId(this.nodeId);
report.setLogAggregationStatus(logAggregationReport
.getLogAggregationStatus());
report
.setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
}
latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
report);
}
return latestLogAggregationReports;
} }
} }

View File

@ -306,6 +306,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
+ currentTime); + currentTime);
String diagnosticMessage = ""; String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
final boolean rename = uploadedLogsInThisCycle; final boolean rename = uploadedLogsInThisCycle;
try { try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() { userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@ -338,20 +339,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
+ LogAggregationUtils.getNodeString(nodeId) + " at " + LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(currentTime) + "\n"; + Times.format(currentTime) + "\n";
renameTemporaryLogFileFailed = true; renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
} }
LogAggregationReport report = LogAggregationReport report =
Records.newRecord(LogAggregationReport.class); Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId); report.setApplicationId(appId);
report.setNodeId(nodeId);
report.setDiagnosticMessage(diagnosticMessage); report.setDiagnosticMessage(diagnosticMessage);
if (appFinished) { report.setLogAggregationStatus(logAggregationSucceedInThisCycle
report.setLogAggregationStatus(renameTemporaryLogFileFailed ? LogAggregationStatus.RUNNING
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); : LogAggregationStatus.RUNNING_WITH_FAILURE);
} else {
report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
}
this.context.getLogAggregationStatusForApps().add(report); this.context.getLogAggregationStatusForApps().add(report);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
// that the log aggregation in this NM is completed.
LogAggregationReport finalReport =
Records.newRecord(LogAggregationReport.class);
finalReport.setApplicationId(appId);
finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
this.context.getLogAggregationStatusForApps().add(report);
}
} finally { } finally {
if (writer != null) { if (writer != null) {
writer.close(); writer.close();

View File

@ -22,12 +22,15 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -152,6 +156,13 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<NodeId, LogAggregationReport> logAggregationStatus = private final Map<NodeId, LogAggregationReport> logAggregationStatus =
new HashMap<NodeId, LogAggregationReport>(); new HashMap<NodeId, LogAggregationReport>();
private LogAggregationStatus logAggregationStatusForAppReport; private LogAggregationStatus logAggregationStatusForAppReport;
private int logAggregationSucceed = 0;
private int logAggregationFailed = 0;
private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
new HashMap<NodeId, List<String>>();
private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
new HashMap<NodeId, List<String>>();
private final int maxLogAggregationDiagnosticsInMemory;
// These states stored are only valid when app is at killing or final_saving. // These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling; private RMAppState stateBeforeKilling;
@ -437,6 +448,14 @@ public class RMAppImpl implements RMApp, Recoverable {
this.logAggregationEnabled = this.logAggregationEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
if (this.logAggregationEnabled) {
this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
} else {
this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
}
maxLogAggregationDiagnosticsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
} }
@Override @Override
@ -834,10 +853,9 @@ public class RMAppImpl implements RMApp, Recoverable {
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent LogAggregationReport.newInstance(app.applicationId,
.getNodeId(), app.logAggregationEnabled app.logAggregationEnabled ? LogAggregationStatus.NOT_START
? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, : LogAggregationStatus.DISABLED, ""));
""));
} }
}; };
} }
@ -1401,18 +1419,20 @@ public class RMAppImpl implements RMApp, Recoverable {
Map<NodeId, LogAggregationReport> outputs = Map<NodeId, LogAggregationReport> outputs =
new HashMap<NodeId, LogAggregationReport>(); new HashMap<NodeId, LogAggregationReport>();
outputs.putAll(logAggregationStatus); outputs.putAll(logAggregationStatus);
for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) { if (!isLogAggregationFinished()) {
if (!output.getValue().getLogAggregationStatus() for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
.equals(LogAggregationStatus.TIME_OUT) if (!output.getValue().getLogAggregationStatus()
&& !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT)
.equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus()
&& !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.SUCCEEDED)
.equals(LogAggregationStatus.FAILED) && !output.getValue().getLogAggregationStatus()
&& isAppInFinalState(this) .equals(LogAggregationStatus.FAILED)
&& System.currentTimeMillis() > this.logAggregationStartTime && isAppInFinalState(this)
+ this.logAggregationStatusTimeout) { && System.currentTimeMillis() > this.logAggregationStartTime
output.getValue().setLogAggregationStatus( + this.logAggregationStatusTimeout) {
LogAggregationStatus.TIME_OUT); output.getValue().setLogAggregationStatus(
LogAggregationStatus.TIME_OUT);
}
} }
} }
return outputs; return outputs;
@ -1424,32 +1444,46 @@ public class RMAppImpl implements RMApp, Recoverable {
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
try { try {
this.writeLock.lock(); this.writeLock.lock();
if (this.logAggregationEnabled) { if (this.logAggregationEnabled && !isLogAggregationFinished()) {
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
boolean stateChangedToFinal = false;
if (curReport == null) { if (curReport == null) {
this.logAggregationStatus.put(nodeId, report); this.logAggregationStatus.put(nodeId, report);
if (isLogAggregationFinishedForNM(report)) {
stateChangedToFinal = true;
}
} else { } else {
if (curReport.getLogAggregationStatus().equals( if (isLogAggregationFinishedForNM(report)) {
LogAggregationStatus.TIME_OUT)) { if (!isLogAggregationFinishedForNM(curReport)) {
if (report.getLogAggregationStatus().equals( stateChangedToFinal = true;
LogAggregationStatus.SUCCEEDED)
|| report.getLogAggregationStatus().equals(
LogAggregationStatus.FAILED)) {
curReport.setLogAggregationStatus(report
.getLogAggregationStatus());
} }
} else {
curReport.setLogAggregationStatus(report.getLogAggregationStatus());
} }
if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
if (report.getDiagnosticMessage() != null || curReport.getLogAggregationStatus() !=
&& !report.getDiagnosticMessage().isEmpty()) { LogAggregationStatus.RUNNING_WITH_FAILURE) {
curReport if (curReport.getLogAggregationStatus()
.setDiagnosticMessage(curReport.getDiagnosticMessage() == null == LogAggregationStatus.TIME_OUT
? report.getDiagnosticMessage() : curReport && report.getLogAggregationStatus()
.getDiagnosticMessage() + report.getDiagnosticMessage()); == LogAggregationStatus.RUNNING) {
// If the log aggregation status got from latest nm heartbeat
// is Running, and current log aggregation status is TimeOut,
// based on whether there are any failure messages for this NM,
// we will reset the log aggregation status as RUNNING or
// RUNNING_WITH_FAILURE
if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
!logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
report.setLogAggregationStatus(
LogAggregationStatus.RUNNING_WITH_FAILURE);
}
}
curReport.setLogAggregationStatus(report
.getLogAggregationStatus());
} }
} }
updateLogAggregationDiagnosticMessages(nodeId, report);
if (isAppInFinalState(this) && stateChangedToFinal) {
updateLogAggregationStatus(nodeId);
}
} }
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
@ -1458,29 +1492,32 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public LogAggregationStatus getLogAggregationStatusForAppReport() { public LogAggregationStatus getLogAggregationStatusForAppReport() {
if (!logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
|| this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
return this.logAggregationStatusForAppReport;
}
try { try {
this.readLock.lock(); this.readLock.lock();
if (! logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
if (isLogAggregationFinished()) {
return this.logAggregationStatusForAppReport;
}
Map<NodeId, LogAggregationReport> reports = Map<NodeId, LogAggregationReport> reports =
getLogAggregationReportsForApp(); getLogAggregationReportsForApp();
if (reports.size() == 0) { if (reports.size() == 0) {
return null; return this.logAggregationStatusForAppReport;
} }
int logNotStartCount = 0; int logNotStartCount = 0;
int logCompletedCount = 0; int logCompletedCount = 0;
int logTimeOutCount = 0; int logTimeOutCount = 0;
int logFailedCount = 0; int logFailedCount = 0;
int logRunningWithFailure = 0;
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) { for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
switch (report.getValue().getLogAggregationStatus()) { switch (report.getValue().getLogAggregationStatus()) {
case NOT_START: case NOT_START:
logNotStartCount++; logNotStartCount++;
break; break;
case RUNNING_WITH_FAILURE:
logRunningWithFailure ++;
break;
case SUCCEEDED: case SUCCEEDED:
logCompletedCount++; logCompletedCount++;
break; break;
@ -1506,19 +1543,122 @@ public class RMAppImpl implements RMApp, Recoverable {
// the log aggregation is finished. And the log aggregation status will // the log aggregation is finished. And the log aggregation status will
// not be updated anymore. // not be updated anymore.
if (logFailedCount > 0 && isAppInFinalState(this)) { if (logFailedCount > 0 && isAppInFinalState(this)) {
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
return LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED;
} else if (logTimeOutCount > 0) { } else if (logTimeOutCount > 0) {
return LogAggregationStatus.TIME_OUT; return LogAggregationStatus.TIME_OUT;
} }
if (isAppInFinalState(this)) { if (isAppInFinalState(this)) {
this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
return LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED;
} }
} else if (logRunningWithFailure > 0) {
return LogAggregationStatus.RUNNING_WITH_FAILURE;
} }
return LogAggregationStatus.RUNNING; return LogAggregationStatus.RUNNING;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }
} }
private boolean isLogAggregationFinished() {
return this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.SUCCEEDED)
|| this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.FAILED);
}
private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
|| report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
}
private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
LogAggregationReport report) {
if (report.getDiagnosticMessage() != null
&& !report.getDiagnosticMessage().isEmpty()) {
if (report.getLogAggregationStatus()
== LogAggregationStatus.RUNNING ) {
List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
if (diagnostics == null) {
diagnostics = new ArrayList<String>();
logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
} else {
if (diagnostics.size()
== maxLogAggregationDiagnosticsInMemory) {
diagnostics.remove(0);
}
}
diagnostics.add(report.getDiagnosticMessage());
this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
StringUtils.join(diagnostics, "\n"));
} else if (report.getLogAggregationStatus()
== LogAggregationStatus.RUNNING_WITH_FAILURE) {
List<String> failureMessages =
logAggregationFailureMessagesForNMs.get(nodeId);
if (failureMessages == null) {
failureMessages = new ArrayList<String>();
logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
} else {
if (failureMessages.size()
== maxLogAggregationDiagnosticsInMemory) {
failureMessages.remove(0);
}
}
failureMessages.add(report.getDiagnosticMessage());
}
}
}
private void updateLogAggregationStatus(NodeId nodeId) {
LogAggregationStatus status =
this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
if (status.equals(LogAggregationStatus.SUCCEEDED)) {
this.logAggregationSucceed++;
} else if (status.equals(LogAggregationStatus.FAILED)) {
this.logAggregationFailed++;
}
if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
this.logAggregationStatusForAppReport =
LogAggregationStatus.SUCCEEDED;
// Since the log aggregation status for this application for all NMs
// is SUCCEEDED, it means all logs are aggregated successfully.
// We could remove all the cached log aggregation reports
this.logAggregationStatus.clear();
this.logAggregationDiagnosticsForNMs.clear();
this.logAggregationFailureMessagesForNMs.clear();
} else if (this.logAggregationSucceed + this.logAggregationFailed
== this.logAggregationStatus.size()) {
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
// We have collected the log aggregation status for all NMs.
// The log aggregation status is FAILED which means the log
// aggregation fails in some NMs. We are only interested in the
// nodes where the log aggregation is failed. So we could remove
// the log aggregation details for those succeeded NMs
for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
Map.Entry<NodeId, LogAggregationReport> entry = it.next();
if (entry.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.SUCCEEDED)) {
it.remove();
}
}
// the log aggregation has finished/failed.
// and the status will not be updated anymore.
this.logAggregationDiagnosticsForNMs.clear();
}
}
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
try {
this.readLock.lock();
List<String> failureMessages =
this.logAggregationFailureMessagesForNMs.get(nodeId);
if (failureMessages == null || failureMessages.isEmpty()) {
return StringUtils.EMPTY;
}
return StringUtils.join(failureMessages, "\n");
} finally {
this.readLock.unlock();
}
}
} }

View File

@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -777,7 +775,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleContainerStatus(statusEvent.getContainers());
Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps = List<LogAggregationReport> logAggregationReportsForApps =
statusEvent.getLogAggregationReportsForApps(); statusEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) { && !logAggregationReportsForApps.isEmpty()) {
@ -915,12 +913,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
private void handleLogAggregationStatus( private void handleLogAggregationStatus(
Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
for (Entry<ApplicationId, LogAggregationReport> report : for (LogAggregationReport report : logAggregationReportsForApps) {
logAggregationReportsForApps.entrySet()) { RMApp rmApp = this.context.getRMApps().get(report.getApplicationId());
RMApp rmApp = this.context.getRMApps().get(report.getKey());
if (rmApp != null) { if (rmApp != null) {
((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue()); ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report);
} }
} }
} }

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode; package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -34,7 +32,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
private final List<ContainerStatus> containersCollection; private final List<ContainerStatus> containersCollection;
private final NodeHeartbeatResponse latestResponse; private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds; private final List<ApplicationId> keepAliveAppIds;
private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps; private List<LogAggregationReport> logAggregationReportsForApps;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@ -50,7 +48,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
NodeHeartbeatResponse latestResponse, NodeHeartbeatResponse latestResponse,
Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE); super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus; this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection; this.containersCollection = collection;
@ -75,13 +73,12 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.keepAliveAppIds; return this.keepAliveAppIds;
} }
public Map<ApplicationId, LogAggregationReport> public List<LogAggregationReport> getLogAggregationReportsForApps() {
getLogAggregationReportsForApps() {
return this.logAggregationReportsForApps; return this.logAggregationReportsForApps;
} }
public void setLogAggregationReportsForApps( public void setLogAggregationReportsForApps(
Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps; this.logAggregationReportsForApps = logAggregationReportsForApps;
} }
} }

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.AppBlock; import org.apache.hadoop.yarn.server.webapp.AppBlock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@ -170,4 +170,13 @@ public class RMAppBlock extends AppBlock{
tbody._()._(); tbody._()._();
} }
@Override
protected LogAggregationStatus getLogAggregationStatus() {
RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID);
if (rmApp == null) {
return null;
}
return rmApp.getLogAggregationStatusForAppReport();
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@ -93,6 +94,9 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
.td("Log Aggregation does not Start.")._(); .td("Log Aggregation does not Start.")._();
table_description.tr().td(LogAggregationStatus.RUNNING.name()) table_description.tr().td(LogAggregationStatus.RUNNING.name())
.td("Log Aggregation is Running.")._(); .td("Log Aggregation is Running.")._();
table_description.tr().td(LogAggregationStatus.RUNNING_WITH_FAILURE.name())
.td("Log Aggregation is Running, but has failures "
+ "in previous cycles")._();
table_description.tr().td(LogAggregationStatus.SUCCEEDED.name()) table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
.td("Log Aggregation is Succeeded. All of the logs have been " .td("Log Aggregation is Succeeded. All of the logs have been "
+ "aggregated successfully.")._(); + "aggregated successfully.")._();
@ -106,24 +110,29 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
table_description._(); table_description._();
div_description._(); div_description._();
boolean logAggregationEnabled = RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
// Application Log aggregation status Table // Application Log aggregation status Table
DIV<Hamlet> div = html.div(_INFO_WRAP); DIV<Hamlet> div = html.div(_INFO_WRAP);
TABLE<DIV<Hamlet>> table = TABLE<DIV<Hamlet>> table =
div.h3( div.h3(
"Log Aggregation: " "Log Aggregation: "
+ (logAggregationEnabled ? "Enabled" : "Disabled")).table( + (rmApp == null ? "N/A" : rmApp
.getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
.getLogAggregationStatusForAppReport().name())).table(
"#LogAggregationStatus"); "#LogAggregationStatus");
table.
tr().
th(_TH, "NodeId").
th(_TH, "Log Aggregation Status").
th(_TH, "Diagnostis Message").
_();
RMApp rmApp = rm.getRMContext().getRMApps().get(appId); int maxLogAggregationDiagnosticsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
table
.tr()
.th(_TH, "NodeId")
.th(_TH, "Log Aggregation Status")
.th(_TH, "Last "
+ maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages")
.th(_TH, "Last "
+ maxLogAggregationDiagnosticsInMemory + " Failure Messages")._();
if (rmApp != null) { if (rmApp != null) {
Map<NodeId, LogAggregationReport> logAggregationReports = Map<NodeId, LogAggregationReport> logAggregationReports =
rmApp.getLogAggregationReportsForApp(); rmApp.getLogAggregationReportsForApp();
@ -136,10 +145,14 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
String message = String message =
report.getValue() == null ? null : report.getValue() report.getValue() == null ? null : report.getValue()
.getDiagnosticMessage(); .getDiagnosticMessage();
String failureMessage =
report.getValue() == null ? null : ((RMAppImpl)rmApp)
.getLogAggregationFailureMessagesForNM(report.getKey());
table.tr() table.tr()
.td(report.getKey().toString()) .td(report.getKey().toString())
.td(status == null ? "N/A" : status.toString()) .td(status == null ? "N/A" : status.toString())
.td(message == null ? "N/A" : message)._(); .td(message == null ? "N/A" : message)
.td(failureMessage == null ? "N/A" : failureMessage)._();
} }
} }
} }

View File

@ -23,7 +23,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -155,26 +155,26 @@ public class TestRMAppLogAggregationStatus {
.getLogAggregationStatus()); .getLogAggregationStatus());
} }
Map<ApplicationId, LogAggregationReport> node1ReportForApp = List<LogAggregationReport> node1ReportForApp =
new HashMap<ApplicationId, LogAggregationReport>(); new ArrayList<LogAggregationReport>();
String messageForNode1_1 = String messageForNode1_1 =
"node1 logAggregation status updated at " + System.currentTimeMillis(); "node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1 = LogAggregationReport report1 =
LogAggregationReport.newInstance(appId, nodeId1, LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
LogAggregationStatus.RUNNING, messageForNode1_1); messageForNode1_1);
node1ReportForApp.put(appId, report1); node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp)); null, node1ReportForApp));
Map<ApplicationId, LogAggregationReport> node2ReportForApp = List<LogAggregationReport> node2ReportForApp =
new HashMap<ApplicationId, LogAggregationReport>(); new ArrayList<LogAggregationReport>();
String messageForNode2_1 = String messageForNode2_1 =
"node2 logAggregation status updated at " + System.currentTimeMillis(); "node2 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report2 = LogAggregationReport report2 =
LogAggregationReport.newInstance(appId, nodeId2, LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode2_1); LogAggregationStatus.RUNNING, messageForNode2_1);
node2ReportForApp.put(appId, report2); node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node2ReportForApp)); null, node2ReportForApp));
@ -205,14 +205,14 @@ public class TestRMAppLogAggregationStatus {
} }
// node1 updates its log aggregation status again // node1 updates its log aggregation status again
Map<ApplicationId, LogAggregationReport> node1ReportForApp2 = List<LogAggregationReport> node1ReportForApp2 =
new HashMap<ApplicationId, LogAggregationReport>(); new ArrayList<LogAggregationReport>();
String messageForNode1_2 = String messageForNode1_2 =
"node1 logAggregation status updated at " + System.currentTimeMillis(); "node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1_2 = LogAggregationReport report1_2 =
LogAggregationReport.newInstance(appId, nodeId1, LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2); LogAggregationStatus.RUNNING, messageForNode1_2);
node1ReportForApp2.put(appId, report1_2); node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp2)); null, node1ReportForApp2));
@ -230,8 +230,9 @@ public class TestRMAppLogAggregationStatus {
if (report.getKey().equals(node1.getNodeID())) { if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus()); .getLogAggregationStatus());
Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report Assert.assertEquals(
.getValue().getDiagnosticMessage()); messageForNode1_1 + "\n" + messageForNode1_2, report
.getValue().getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) { } else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus()); .getLogAggregationStatus());
@ -268,15 +269,19 @@ public class TestRMAppLogAggregationStatus {
// Finally, node1 finished its log aggregation and sent out its final // Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should // log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to SUCCEEDED // be changed from TIME_OUT to SUCCEEDED
Map<ApplicationId, LogAggregationReport> node1ReportForApp3 = List<LogAggregationReport> node1ReportForApp3 =
new HashMap<ApplicationId, LogAggregationReport>(); new ArrayList<LogAggregationReport>();
String messageForNode1_3 = LogAggregationReport report1_3;
"node1 final logAggregation status updated at " for (int i = 0; i < 10 ; i ++) {
+ System.currentTimeMillis(); report1_3 =
LogAggregationReport report1_3 = LogAggregationReport.newInstance(appId,
LogAggregationReport.newInstance(appId, nodeId1, LogAggregationStatus.RUNNING, "test_message_" + i);
LogAggregationStatus.SUCCEEDED, messageForNode1_3); node1ReportForApp3.add(report1_3);
node1ReportForApp3.put(appId, report1_3); }
node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
LogAggregationStatus.SUCCEEDED, ""));
// For every logAggregationReport cached in memory, we can only save at most
// 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null, .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp3)); null, node1ReportForApp3));
@ -290,8 +295,14 @@ public class TestRMAppLogAggregationStatus {
if (report.getKey().equals(node1.getNodeID())) { if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue() Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus()); .getLogAggregationStatus());
Assert.assertEquals(messageForNode1_1 + messageForNode1_2 StringBuilder builder = new StringBuilder();
+ messageForNode1_3, report.getValue().getDiagnosticMessage()); for (int i = 0; i < 9; i ++) {
builder.append("test_message_" + i);
builder.append("\n");
}
builder.append("test_message_" + 9);
Assert.assertEquals(builder.toString(), report.getValue()
.getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) { } else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue() Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus()); .getLogAggregationStatus());
@ -301,6 +312,32 @@ public class TestRMAppLogAggregationStatus {
.fail("should not contain log aggregation report for other nodes"); .fail("should not contain log aggregation report for other nodes");
} }
} }
// update log aggregationStatus for node2 as FAILED,
// so the log aggregation status for the App will become FAILED,
// and we only keep the log aggregation reports whose status is FAILED,
// so the log aggregation report for node1 will be removed.
List<LogAggregationReport> node2ReportForApp2 =
new ArrayList<LogAggregationReport>();
LogAggregationReport report2_2 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
LogAggregationReport report2_3 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.FAILED, "");
node2ReportForApp2.add(report2_2);
node2ReportForApp2.add(report2_3);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node2ReportForApp2));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertTrue(logAggregationStatus.size() == 1);
Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
Assert.assertEquals("Fail_Message",
((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
} }
@Test (timeout = 10000) @Test (timeout = 10000)
@ -317,9 +354,11 @@ public class TestRMAppLogAggregationStatus {
// Enable the log aggregation // Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf); rmApp = (RMAppImpl)createRMApp(conf);
// If we do not know any NodeManagers for this application , // If we do not know any NodeManagers for this application , and
// the log aggregation status will return null // the log aggregation is enabled, the log aggregation status will
Assert.assertNull(rmApp.getLogAggregationStatusForAppReport()); // return NOT_START
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111); NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222); NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@ -329,24 +368,24 @@ public class TestRMAppLogAggregationStatus {
// If the log aggregation status for all NMs are NOT_START, // If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START // the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.NOT_START, Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, "")); rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING, Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());
@ -357,13 +396,13 @@ public class TestRMAppLogAggregationStatus {
// others are SUCCEEDED, the log aggregation status for this app will // others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT // return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT, Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());
@ -371,17 +410,59 @@ public class TestRMAppLogAggregationStatus {
// is at the final state, the log aggregation status for this app will // is at the final state, the log aggregation status for this app will
// return SUCCEEDED // return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf); rmApp = (RMAppImpl)createRMApp(conf);
// If the log aggregation status for at least one of NMs are RUNNING,
// the log aggregation status for this app will return RUNNING
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
// If the log aggregation status for at least one of NMs
// are RUNNING_WITH_FAILURE, the log aggregation status
// for this app will return RUNNING_WITH_FAILURE
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
""));
Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
rmApp.getLogAggregationStatusForAppReport());
// For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
// it will not be changed even it get a new log aggregation status
// as RUNNING
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
rmApp.getLogAggregationStatusForAppReport());
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED, // If at least of one log aggregation status for one NM is FAILED,
@ -389,13 +470,13 @@ public class TestRMAppLogAggregationStatus {
// at the final state, the log aggregation status for this app // at the final state, the log aggregation status for this app
// will return FAILED // will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
Assert.assertEquals(LogAggregationStatus.FAILED, Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());