YARN-7842. PB changes to carry node-attributes in NM heartbeat. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2018-01-31 20:28:41 +08:00 committed by Sunil G
parent 2475fb0a1e
commit d9d93e3925
4 changed files with 86 additions and 0 deletions

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
public abstract class NodeHeartbeatRequest {
@ -61,6 +62,18 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
return nodeHeartbeatRequest;
}
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
Set<NodeAttribute> nodeAttributes,
Map<ApplicationId, AppCollectorData> registeringCollectors) {
NodeHeartbeatRequest request = NodeHeartbeatRequest
.newInstance(nodeStatus, lastKnownContainerTokenMasterKey,
lastKnownNMTokenMasterKey, nodeLabels, registeringCollectors);
request.setNodeAttributes(nodeAttributes);
return request;
}
public abstract NodeStatus getNodeStatus();
public abstract void setNodeStatus(NodeStatus status);
@ -85,4 +98,8 @@ public abstract void setLogAggregationReportsForApps(
public abstract void setRegisteringCollectors(Map<ApplicationId,
AppCollectorData> appCollectorsMap);
public abstract Set<NodeAttribute> getNodeAttributes();
public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
}

View File

@ -27,6 +27,9 @@
import java.util.Set;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
@ -36,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
@ -60,6 +64,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set<NodeLabel> labels = null;
private Set<NodeAttribute> attributes = null;
private List<LogAggregationReport> logAggregationReportsForApps = null;
private Map<ApplicationId, AppCollectorData> registeringCollectors = null;
@ -115,6 +120,15 @@ private void mergeLocalToBuilder() {
}
builder.setNodeLabels(newBuilder.build());
}
if (this.attributes != null) {
builder.clearNodeAttributes();
YarnServerCommonServiceProtos.NodeAttributesProto.Builder attBuilder =
YarnServerCommonServiceProtos.NodeAttributesProto.newBuilder();
for (NodeAttribute attribute : attributes) {
attBuilder.addNodeAttributes(convertToProtoFormat(attribute));
}
builder.setNodeAttributes(attBuilder.build());
}
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
@ -371,6 +385,44 @@ private NodeLabelProto convertToProtoFormat(NodeLabel t) {
return ((NodeLabelPBImpl)t).getProto();
}
@Override
public Set<NodeAttribute> getNodeAttributes() {
initNodeAttributes();
return this.attributes;
}
private void initNodeAttributes() {
if (this.attributes != null) {
return;
}
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeAttributes()) {
return;
}
YarnServerCommonServiceProtos.NodeAttributesProto nodeAttributes =
p.getNodeAttributes();
attributes = new HashSet<>();
for (NodeAttributeProto attributeProto :
nodeAttributes.getNodeAttributesList()) {
attributes.add(convertFromProtoFormat(attributeProto));
}
}
@Override
public void setNodeAttributes(Set<NodeAttribute> nodeAttributes) {
maybeInitBuilder();
builder.clearNodeAttributes();
this.attributes = nodeAttributes;
}
private NodeAttributePBImpl convertFromProtoFormat(NodeAttributeProto p) {
return new NodeAttributePBImpl(p);
}
private NodeAttributeProto convertToProtoFormat(NodeAttribute attribute) {
return ((NodeAttributePBImpl) attribute).getProto();
}
@Override
public List<LogAggregationReport> getLogAggregationReportsForApps() {
if (this.logAggregationReportsForApps != null) {

View File

@ -58,6 +58,10 @@ message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
message NodeAttributesProto {
repeated NodeAttributeProto nodeAttributes = 1;
}
message RegisterNodeManagerRequestProto {
optional NodeIdProto node_id = 1;
optional int32 http_port = 3;
@ -95,6 +99,7 @@ message NodeHeartbeatRequestProto {
optional NodeLabelsProto nodeLabels = 4;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
repeated AppCollectorDataProto registering_collectors = 6;
optional NodeAttributesProto nodeAttributes = 7;
}
message LogAggregationReportProto {

View File

@ -24,7 +24,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@ -39,6 +41,8 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
@ -173,6 +177,13 @@ public void testNodeHeartBeatRequest() throws IOException {
nodeStatus.setOpportunisticContainersStatus(opportunisticContainersStatus);
record.setNodeStatus(nodeStatus);
Set<NodeAttribute> attributeSet =
Sets.newHashSet(NodeAttribute.newInstance("attributeA",
NodeAttributeType.STRING, "valueA"),
NodeAttribute.newInstance("attributeB",
NodeAttributeType.STRING, "valueB"));
record.setNodeAttributes(attributeSet);
NodeHeartbeatRequestPBImpl pb = new
NodeHeartbeatRequestPBImpl(
((NodeHeartbeatRequestPBImpl) record).getProto());
@ -183,6 +194,7 @@ public void testNodeHeartBeatRequest() throws IOException {
Assert.assertEquals(321,
pb.getNodeStatus().getOpportunisticContainersStatus()
.getWaitQueueLength());
Assert.assertEquals(2, pb.getNodeAttributes().size());
}
@Test