> table =
div.h3(
"Log Aggregation: "
- + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+ + (rmApp == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
+ .getLogAggregationStatusForAppReport().name())).table(
"#LogAggregationStatus");
- table.
- tr().
- th(_TH, "NodeId").
- th(_TH, "Log Aggregation Status").
- th(_TH, "Diagnostis Message").
- _();
- RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+ int maxLogAggregationDiagnosticsInMemory = conf.getInt(
+ YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+ YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+ table
+ .tr()
+ .th(_TH, "NodeId")
+ .th(_TH, "Log Aggregation Status")
+ .th(_TH, "Last "
+ + maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages")
+ .th(_TH, "Last "
+ + maxLogAggregationDiagnosticsInMemory + " Failure Messages")._();
+
if (rmApp != null) {
Map
logAggregationReports =
rmApp.getLogAggregationReportsForApp();
@@ -136,10 +145,14 @@ protected void render(Block html) {
String message =
report.getValue() == null ? null : report.getValue()
.getDiagnosticMessage();
+ String failureMessage =
+ report.getValue() == null ? null : ((RMAppImpl)rmApp)
+ .getLogAggregationFailureMessagesForNM(report.getKey());
table.tr()
.td(report.getKey().toString())
.td(status == null ? "N/A" : status.toString())
- .td(message == null ? "N/A" : message)._();
+ .td(message == null ? "N/A" : message)
+ .td(failureMessage == null ? "N/A" : failureMessage)._();
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 4eec63f679a..9af4290f5db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -23,7 +23,7 @@
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -155,26 +155,26 @@ public void testLogAggregationStatus() throws Exception {
.getLogAggregationStatus());
}
- Map node1ReportForApp =
- new HashMap();
+ List node1ReportForApp =
+ new ArrayList();
String messageForNode1_1 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.RUNNING, messageForNode1_1);
- node1ReportForApp.put(appId, report1);
+ LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
+ messageForNode1_1);
+ node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp));
- Map node2ReportForApp =
- new HashMap();
+ List node2ReportForApp =
+ new ArrayList();
String messageForNode2_1 =
"node2 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report2 =
- LogAggregationReport.newInstance(appId, nodeId2,
+ LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode2_1);
- node2ReportForApp.put(appId, report2);
+ node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node2ReportForApp));
@@ -205,14 +205,14 @@ public void testLogAggregationStatus() throws Exception {
}
// node1 updates its log aggregation status again
- Map node1ReportForApp2 =
- new HashMap();
+ List node1ReportForApp2 =
+ new ArrayList();
String messageForNode1_2 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1_2 =
- LogAggregationReport.newInstance(appId, nodeId1,
+ LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2);
- node1ReportForApp2.put(appId, report1_2);
+ node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp2));
@@ -230,8 +230,9 @@ public void testLogAggregationStatus() throws Exception {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
- .getValue().getDiagnosticMessage());
+ Assert.assertEquals(
+ messageForNode1_1 + "\n" + messageForNode1_2, report
+ .getValue().getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
@@ -268,15 +269,19 @@ public void testLogAggregationStatus() throws Exception {
// Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to SUCCEEDED
- Map node1ReportForApp3 =
- new HashMap();
- String messageForNode1_3 =
- "node1 final logAggregation status updated at "
- + System.currentTimeMillis();
- LogAggregationReport report1_3 =
- LogAggregationReport.newInstance(appId, nodeId1,
- LogAggregationStatus.SUCCEEDED, messageForNode1_3);
- node1ReportForApp3.put(appId, report1_3);
+ List node1ReportForApp3 =
+ new ArrayList();
+ LogAggregationReport report1_3;
+ for (int i = 0; i < 10 ; i ++) {
+ report1_3 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING, "test_message_" + i);
+ node1ReportForApp3.add(report1_3);
+ }
+ node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.SUCCEEDED, ""));
+ // For every logAggregationReport cached in memory, we can only save at most
+ // 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList(), null,
null, node1ReportForApp3));
@@ -290,8 +295,14 @@ public void testLogAggregationStatus() throws Exception {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus());
- Assert.assertEquals(messageForNode1_1 + messageForNode1_2
- + messageForNode1_3, report.getValue().getDiagnosticMessage());
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 9; i ++) {
+ builder.append("test_message_" + i);
+ builder.append("\n");
+ }
+ builder.append("test_message_" + 9);
+ Assert.assertEquals(builder.toString(), report.getValue()
+ .getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus());
@@ -301,6 +312,32 @@ public void testLogAggregationStatus() throws Exception {
.fail("should not contain log aggregation report for other nodes");
}
}
+
+ // update log aggregationStatus for node2 as FAILED,
+ // so the log aggregation status for the App will become FAILED,
+ // and we only keep the log aggregation reports whose status is FAILED,
+ // so the log aggregation report for node1 will be removed.
+ List node2ReportForApp2 =
+ new ArrayList();
+ LogAggregationReport report2_2 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
+ LogAggregationReport report2_3 =
+ LogAggregationReport.newInstance(appId,
+ LogAggregationStatus.FAILED, "");
+ node2ReportForApp2.add(report2_2);
+ node2ReportForApp2.add(report2_3);
+ node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+ .newInstance(true, null, 0), new ArrayList(), null,
+ null, node2ReportForApp2));
+ Assert.assertEquals(LogAggregationStatus.FAILED,
+ rmApp.getLogAggregationStatusForAppReport());
+ logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+ Assert.assertTrue(logAggregationStatus.size() == 1);
+ Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
+ Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
+ Assert.assertEquals("Fail_Message",
+ ((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
}
@Test (timeout = 10000)
@@ -317,9 +354,11 @@ public void testGetLogAggregationStatusForAppReport() {
// Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf);
- // If we do not know any NodeManagers for this application ,
- // the log aggregation status will return null
- Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
+ // If we do not know any NodeManagers for this application , and
+ // the log aggregation is enabled, the log aggregation status will
+ // return NOT_START
+ Assert.assertEquals(LogAggregationStatus.NOT_START,
+ rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@@ -329,24 +368,24 @@ public void testGetLogAggregationStatusForAppReport() {
// If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
@@ -357,13 +396,13 @@ public void testGetLogAggregationStatusForAppReport() {
// others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport());
@@ -371,17 +410,59 @@ public void testGetLogAggregationStatusForAppReport() {
// is at the final state, the log aggregation status for this app will
// return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
+ // If the log aggregation status for at least one of NMs are RUNNING,
+ // the log aggregation status for this app will return RUNNING
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING,
+ rmApp.getLogAggregationStatusForAppReport());
+
+ // If the log aggregation status for at least one of NMs
+ // are RUNNING_WITH_FAILURE, the log aggregation status
+ // for this app will return RUNNING_WITH_FAILURE
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
+ ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+ rmApp.getLogAggregationStatusForAppReport());
+
+ // For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
+ // it will not be changed even it get a new log aggregation status
+ // as RUNNING
+ rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+ rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+ rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+ Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+ rmApp.getLogAggregationStatusForAppReport());
+
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED,
@@ -389,13 +470,13 @@ public void testGetLogAggregationStatusForAppReport() {
// at the final state, the log aggregation status for this app
// will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
- rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+ rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());