diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 86277d3ff43..493b69eded4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -12,6 +12,9 @@ Release 2.8.0 - UNRELEASED
container-executor for outbound network traffic control. (Sidharta Seethana
via vinodkv)
+ YARN-1376. NM need to notify the log aggregation status to RM through
+ heartbeat. (Xuan Gong via junping_du)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a25cfe98446..81bcb9b89b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -741,6 +741,17 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "log-aggregation.retain-check-interval-seconds";
public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
+ /**
+ * How long for ResourceManager to wait for NodeManager to report its
+ * log aggregation status. If waiting time of which the log aggregation status
+ * is reported from NodeManager exceeds the configured value, RM will report
+ * log aggregation status for this NodeManager as TIME_OUT
+ */
+ public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
+ YARN_PREFIX + "log-aggregation-status.time-out.ms";
+ public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
+ = 10 * 60 * 1000;
+
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
new file mode 100644
index 00000000000..808804b687d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code LogAggregationReport} is a report for log aggregation status
+ * in one NodeManager of an application.
+ *
+ * It includes details such as:
+ *
+ *
{@link ApplicationId} of the application.
+ *
{@link NodeId} of the NodeManager.
+ *
{@link LogAggregationStatus}
+ *
Diagnostic information
+ *
+ *
+ */
+@Public
+@Unstable
+public abstract class LogAggregationReport {
+
+ @Public
+ @Unstable
+ public static LogAggregationReport newInstance(ApplicationId appId,
+ NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+ LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
+ report.setApplicationId(appId);
+ report.setLogAggregationStatus(status);
+ report.setDiagnosticMessage(diagnosticMessage);
+ return report;
+ }
+
+ /**
+ * Get the ApplicationId of the application.
+ * @return ApplicationId of the application
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationId(ApplicationId appId);
+
+ /**
+ * Get the NodeId.
+ * @return NodeId
+ */
+ @Public
+ @Unstable
+ public abstract NodeId getNodeId();
+
+ @Public
+ @Unstable
+ public abstract void setNodeId(NodeId nodeId);
+
+ /**
+ * Get the LogAggregationStatus.
+ * @return LogAggregationStatus
+ */
+ @Public
+ @Unstable
+ public abstract LogAggregationStatus getLogAggregationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setLogAggregationStatus(
+ LogAggregationStatus logAggregationStatus);
+
+ /**
+ * Get the diagnositic information of this log aggregation
+ * @return diagnositic information of this log aggregation
+ */
+ @Public
+ @Unstable
+ public abstract String getDiagnosticMessage();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticMessage(String diagnosticMessage);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index b80d9cedd02..227363fec7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@@ -51,4 +53,10 @@ public abstract class NodeHeartbeatRequest {
public abstract Set getNodeLabels();
public abstract void setNodeLabels(Set nodeLabels);
+
+ public abstract Map
+ getLogAggregationReportsForApps();
+
+ public abstract void setLogAggregationReportsForApps(
+ Map logAggregationReportsForApps);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
new file mode 100644
index 00000000000..7999fa7aade
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogAggregationReportPBImpl extends LogAggregationReport {
+
+ LogAggregationReportProto proto = LogAggregationReportProto
+ .getDefaultInstance();
+ LogAggregationReportProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
+
+ private ApplicationId applicationId;
+ private NodeId nodeId;
+
+ public LogAggregationReportPBImpl() {
+ builder = LogAggregationReportProto.newBuilder();
+ }
+
+ public LogAggregationReportPBImpl(LogAggregationReportProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogAggregationReportProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.applicationId != null
+ && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+ builder.getApplicationId())) {
+ builder.setApplicationId(convertToProtoFormat(this.applicationId));
+ }
+
+ if (this.nodeId != null
+ && !((NodeIdPBImpl) this.nodeId).getProto().equals(
+ builder.getNodeId())) {
+ builder.setNodeId(convertToProtoFormat(this.nodeId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogAggregationReportProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasApplicationId()) {
+ return null;
+ }
+ this.applicationId = convertFromProtoFormat(p.getApplicationId());
+ return this.applicationId;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null)
+ builder.clearApplicationId();
+ this.applicationId = appId;
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(
+ ApplicationIdProto applicationId) {
+ return new ApplicationIdPBImpl(applicationId);
+ }
+
+ @Override
+ public LogAggregationStatus getLogAggregationStatus() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasLogAggregationStatus()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getLogAggregationStatus());
+ }
+
+ @Override
+ public void
+ setLogAggregationStatus(LogAggregationStatus logAggregationStatus) {
+ maybeInitBuilder();
+ if (logAggregationStatus == null) {
+ builder.clearLogAggregationStatus();
+ return;
+ }
+ builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus));
+ }
+
+ private LogAggregationStatus convertFromProtoFormat(
+ LogAggregationStatusProto s) {
+ return LogAggregationStatus.valueOf(s.name().replace(
+ LOGAGGREGATION_STATUS_PREFIX, ""));
+ }
+
+ private LogAggregationStatusProto
+ convertToProtoFormat(LogAggregationStatus s) {
+ return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+ + s.name());
+ }
+
+ @Override
+ public String getDiagnosticMessage() {
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnosticMessage(String diagnosticMessage) {
+ maybeInitBuilder();
+ if (diagnosticMessage == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnosticMessage);
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ if (this.nodeId != null) {
+ return this.nodeId;
+ }
+
+ LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeId()) {
+ return null;
+ }
+ this.nodeId = convertFromProtoFormat(p.getNodeId());
+ return this.nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ maybeInitBuilder();
+ if (nodeId == null)
+ builder.clearNodeId();
+ this.nodeId = nodeId;
+ }
+
+ private NodeIdProto convertToProtoFormat(NodeId t) {
+ return ((NodeIdPBImpl) t).getProto();
+ }
+
+ private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
+ return new NodeIdPBImpl(nodeId);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 16d47f9df1c..03db39ce631 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,15 +18,24 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -42,6 +51,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set labels = null;
+ private Map
+ logAggregationReportsForApps = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -91,6 +102,25 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
+ if (this.logAggregationReportsForApps != null) {
+ addLogAggregationStatusForAppsToProto();
+ }
+ }
+
+ private void addLogAggregationStatusForAppsToProto() {
+ maybeInitBuilder();
+ builder.clearLogAggregationReportsForApps();
+ for (Entry entry : logAggregationReportsForApps
+ .entrySet()) {
+ builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
+ .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
+ .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+ }
+ }
+
+ private LogAggregationReportProto convertToProtoFormat(
+ LogAggregationReport value) {
+ return ((LogAggregationReportPBImpl) value).getProto();
}
private void mergeLocalToProto() {
@@ -215,4 +245,54 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet(nodeLabels.getElementsList());
}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ @Override
+ public Map
+ getLogAggregationReportsForApps() {
+ if (this.logAggregationReportsForApps != null) {
+ return this.logAggregationReportsForApps;
+ }
+ initLogAggregationReportsForApps();
+ return logAggregationReportsForApps;
+ }
+
+ private void initLogAggregationReportsForApps() {
+ NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List list =
+ p.getLogAggregationReportsForAppsList();
+ this.logAggregationReportsForApps =
+ new HashMap();
+ for (LogAggregationReportsForAppsProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ LogAggregationReport report =
+ convertFromProtoFormat(c.getLogAggregationReport());
+ this.logAggregationReportsForApps.put(appId, report);
+ }
+ }
+
+ private LogAggregationReport convertFromProtoFormat(
+ LogAggregationReportProto logAggregationReport) {
+ return new LogAggregationReportPBImpl(logAggregationReport);
+ }
+
+ @Override
+ public void setLogAggregationReportsForApps(
+ Map logAggregationStatusForApps) {
+ if (logAggregationStatusForApps == null
+ || logAggregationStatusForApps.isEmpty()) {
+ return;
+ }
+ maybeInitBuilder();
+ this.logAggregationReportsForApps =
+ new HashMap();
+ this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
new file mode 100644
index 00000000000..496767fcab7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+/**
+ *