diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5bd00122d74..90782d8fe3b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -201,6 +201,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-711. Copied BuilderUtil methods in individual API records as
BuilderUtils is going to be dismantled. (Jian He via vinodkv)
+ YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
+ (Omkar Vinit Joshi via vinodkv)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 8da0d95bb2c..aaf0ecdd979 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
/**
*
The response sent by the ResourceManager
the
@@ -176,5 +177,24 @@ public interface AllocateResponse {
@Private
@Unstable
public void setPreemptionMessage(PreemptionMessage request);
+
+ @Public
+ @Stable
+ public void setNMTokens(List nmTokens);
+
+ /**
+ * Get the list of NMTokens required for communicating with NM. New NMTokens
+ * issued only if
+ * 1) AM is receiving first container on underlying NodeManager.
+ * OR
+ * 2) NMToken master key rolled over in ResourceManager and AM is getting new
+ * container on the same underlying NodeManager.
+ * AM will receive one NMToken per NM irrespective of the number of containers
+ * issued on same NM. AM is expected to store these tokens until issued a
+ * new token for the same NM.
+ */
+ @Public
+ @Stable
+ public List getNMTokens();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index dac8c73580d..a6abe7aeeb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Container;
@@ -30,10 +31,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -52,6 +55,7 @@ public class AllocateResponsePBImpl extends ProtoBase
Resource limit;
private List allocatedContainers = null;
+ private List nmTokens = null;
private List completedContainersStatuses = null;
private List updatedNodes = null;
@@ -81,6 +85,11 @@ public class AllocateResponsePBImpl extends ProtoBase
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
+ if (nmTokens != null) {
+ builder.clearNmTokens();
+ Iterable iterable = getTokenProtoIterable(nmTokens);
+ builder.addAllNmTokens(iterable);
+ }
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable iterable =
@@ -210,6 +219,24 @@ public class AllocateResponsePBImpl extends ProtoBase
completedContainersStatuses.addAll(containers);
}
+ @Override
+ public synchronized void setNMTokens(List nmTokens) {
+ if (nmTokens == null || nmTokens.isEmpty()) {
+ this.nmTokens.clear();
+ builder.clearNmTokens();
+ return;
+ }
+ // Implementing it as an append rather than set for consistency
+ initLocalNewNMTokenList();
+ this.nmTokens.addAll(nmTokens);
+ }
+
+ @Override
+ public synchronized List getNMTokens() {
+ initLocalNewNMTokenList();
+ return nmTokens;
+ }
+
@Override
public synchronized int getNumClusterNodes() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -274,6 +301,18 @@ public class AllocateResponsePBImpl extends ProtoBase
}
}
+ private synchronized void initLocalNewNMTokenList() {
+ if (nmTokens != null) {
+ return;
+ }
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getNmTokensList();
+ nmTokens = new ArrayList();
+ for (TokenProto t : list) {
+ nmTokens.add(convertFromProtoFormat(t));
+ }
+ }
+
private synchronized Iterable getProtoIterable(
final List newContainersList) {
maybeInitBuilder();
@@ -305,6 +344,35 @@ public class AllocateResponsePBImpl extends ProtoBase
};
}
+ private synchronized Iterable getTokenProtoIterable(
+ final List nmTokenList) {
+ maybeInitBuilder();
+ return new Iterable() {
+ @Override
+ public synchronized Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = nmTokenList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public TokenProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
private synchronized Iterable
getContainerStatusProtoIterable(
final List newContainersList) {
@@ -427,4 +495,12 @@ public class AllocateResponsePBImpl extends ProtoBase
private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) {
return ((PreemptionMessagePBImpl)r).getProto();
}
+
+ private synchronized TokenProto convertToProtoFormat(Token token) {
+ return ((TokenPBImpl)token).getProto();
+ }
+
+ private synchronized Token convertFromProtoFormat(TokenProto proto) {
+ return new TokenPBImpl(proto);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 6ac02741bac..480fe16ce96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -67,6 +67,7 @@ message AllocateResponseProto {
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
+ repeated hadoop.common.TokenProto nm_tokens = 9;
}
message PreemptionMessageProto {