From b37da52a1c4fb3da2bd21bfadc5ec61c5f953a59 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 19 May 2015 16:34:17 -0700 Subject: [PATCH] YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use NodeLabel object instead of String. (Naganarasimha G R via wangda) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../src/main/proto/yarn_protos.proto | 4 --- .../nodelabels/CommonNodeLabelsManager.java | 2 ++ .../yarn/nodelabels/NodeLabelTestBase.java | 12 +++++++ .../protocolrecords/NodeHeartbeatRequest.java | 7 ++-- .../RegisterNodeManagerRequest.java | 7 ++-- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 34 +++++++++++++----- .../pb/RegisterNodeManagerRequestPBImpl.java | 35 ++++++++++++++----- .../yarn_server_common_service_protos.proto | 8 +++-- .../hadoop/yarn/TestYarnServerApiClasses.java | 19 +++++----- .../nodemanager/NodeStatusUpdaterImpl.java | 23 ++++++------ .../nodelabels/NodeLabelsProvider.java | 3 +- .../TestNodeStatusUpdaterForLabels.java | 23 ++++++------ .../ResourceTrackerService.java | 18 ++++++++-- .../TestResourceTrackerService.java | 25 ++++++------- 15 files changed, 149 insertions(+), 74 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5a6fb38c702..ab6f488f3a6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -247,6 +247,9 @@ Release 2.8.0 - UNRELEASED YARN-3362. Add node label usage in RM CapacityScheduler web UI. (Naganarasimha G R via wangda) + YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use + NodeLabel object instead of String. (Naganarasimha G R via wangda) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 4095676ba31..3c4aa524c3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -253,10 +253,6 @@ message NodeIdToLabelsProto { repeated string nodeLabels = 2; } -message StringArrayProto { - repeated string elements = 1; -} - message LabelsToNodeIdsProto { optional string nodeLabels = 1; repeated NodeIdProto nodeId = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index bf34837253c..badf4d6decc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeId; @@ -59,6 +60,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +@Private public class CommonNodeLabelsManager extends AbstractService { protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class); private static final int MAX_LABEL_LENGTH = 255; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java index 8301d96d179..f834d542ee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java @@ -112,6 +112,18 @@ public static Set toSet(E... elements) { return set; } + @SuppressWarnings("unchecked") + public static Set toNodeLabelSet(String... nodeLabelsStr) { + if (null == nodeLabelsStr) { + return null; + } + Set labels = new HashSet(); + for (String label : nodeLabelsStr) { + labels.add(NodeLabel.newInstance(label)); + } + return labels; + } + public NodeId toNodeId(String str) { if (str.contains(":")) { int idx = str.indexOf(':'); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 767e4b0c461..84ca8a497f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -29,7 +30,7 @@ public abstract class NodeHeartbeatRequest { public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey, Set nodeLabels) { + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -50,8 +51,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); - public abstract Set getNodeLabels(); - public abstract void setNodeLabels(Set nodeLabels); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); public abstract List getLogAggregationReportsForApps(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index bf09b33f8af..7798ba96a56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -39,7 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, - List runningApplications, Set nodeLabels) { + List runningApplications, Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -57,8 +58,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); - public abstract Set getNodeLabels(); - public abstract void setNodeLabels(Set nodeLabels); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 81f173d85d0..0a9895e6c3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -24,12 +24,16 @@ import java.util.List; import java.util.Set; -import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -45,7 +49,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; - private Set labels = null; + private Set labels = null; private List logAggregationReportsForApps = null; public NodeHeartbeatRequestPBImpl() { @@ -93,8 +97,11 @@ private void mergeLocalToBuilder() { } if (this.labels != null) { builder.clearNodeLabels(); - builder.setNodeLabels(StringArrayProto.newBuilder() - .addAllElements(this.labels).build()); + Builder newBuilder = NodeLabelsProto.newBuilder(); + for (NodeLabel label : labels) { + newBuilder.addNodeLabels(convertToProtoFormat(label)); + } + builder.setNodeLabels(newBuilder.build()); } if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); @@ -238,13 +245,13 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) { } @Override - public Set getNodeLabels() { + public Set getNodeLabels() { initNodeLabels(); return this.labels; } @Override - public void setNodeLabels(Set nodeLabels) { + public void setNodeLabels(Set nodeLabels) { maybeInitBuilder(); builder.clearNodeLabels(); this.labels = nodeLabels; @@ -259,8 +266,19 @@ private void initNodeLabels() { labels = null; return; } - StringArrayProto nodeLabels = p.getNodeLabels(); - labels = new HashSet(nodeLabels.getElementsList()); + NodeLabelsProto nodeLabels = p.getNodeLabels(); + labels = new HashSet(); + for(NodeLabelProto nlp : nodeLabels.getNodeLabelsList()) { + labels.add(convertFromProtoFormat(nlp)); + } + } + + private NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { + return new NodeLabelPBImpl(p); + } + + private NodeLabelProto convertToProtoFormat(NodeLabel t) { + return ((NodeLabelPBImpl)t).getProto(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 1d2bb829a37..5b0e0a1f87c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -27,16 +27,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; -import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -51,7 +54,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; - private Set labels = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -84,8 +87,11 @@ private void mergeLocalToBuilder() { } if (this.labels != null) { builder.clearNodeLabels(); - builder.setNodeLabels(StringArrayProto.newBuilder() - .addAllElements(this.labels).build()); + Builder newBuilder = NodeLabelsProto.newBuilder(); + for (NodeLabel label : labels) { + newBuilder.addNodeLabels(convertToProtoFormat(label)); + } + builder.setNodeLabels(newBuilder.build()); } } @@ -293,13 +299,13 @@ public void setNMVersion(String version) { } @Override - public Set getNodeLabels() { + public Set getNodeLabels() { initNodeLabels(); return this.labels; } @Override - public void setNodeLabels(Set nodeLabels) { + public void setNodeLabels(Set nodeLabels) { maybeInitBuilder(); builder.clearNodeLabels(); this.labels = nodeLabels; @@ -314,8 +320,19 @@ private void initNodeLabels() { labels=null; return; } - StringArrayProto nodeLabels = p.getNodeLabels(); - labels = new HashSet(nodeLabels.getElementsList()); + NodeLabelsProto nodeLabels = p.getNodeLabels(); + labels = new HashSet(); + for(NodeLabelProto nlp : nodeLabels.getNodeLabelsList()) { + labels.add(convertFromProtoFormat(nlp)); + } + } + + private NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { + return new NodeLabelPBImpl(p); + } + + private NodeLabelProto convertToProtoFormat(NodeLabel t) { + return ((NodeLabelPBImpl)t).getProto(); } private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c027ac080e0..f3735a0e69c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -25,6 +25,10 @@ package hadoop.yarn; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; +message NodeLabelsProto { + repeated NodeLabelProto nodeLabels = 1; +} + message RegisterNodeManagerRequestProto { optional NodeIdProto node_id = 1; optional int32 http_port = 3; @@ -32,7 +36,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; - optional StringArrayProto nodeLabels = 8; + optional NodeLabelsProto nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -49,7 +53,7 @@ message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; - optional StringArrayProto nodeLabels = 4; + optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index d42b2c7d0c3..f882657d193 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; @@ -113,7 +114,7 @@ public void testNodeHeartbeatRequestPBImpl() { Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); // check for empty labels - original.setNodeLabels(new HashSet ()); + original.setNodeLabels(new HashSet ()); copy = new NodeHeartbeatRequestPBImpl( original.getProto()); Assert.assertNotNull(copy.getNodeLabels()); @@ -271,7 +272,7 @@ public void testRegisterNodeManagerRequestWithNullLabels() { @Test public void testRegisterNodeManagerRequestWithValidLabels() { - HashSet nodeLabels = getValidNodeLabels(); + HashSet nodeLabels = getValidNodeLabels(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance( NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), @@ -286,19 +287,19 @@ public void testRegisterNodeManagerRequestWithValidLabels() { Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels())); // check for empty labels - request.setNodeLabels(new HashSet ()); + request.setNodeLabels(new HashSet ()); copy = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); Assert.assertNotNull(copy.getNodeLabels()); Assert.assertEquals(0, copy.getNodeLabels().size()); } - private HashSet getValidNodeLabels() { - HashSet nodeLabels = new HashSet(); - nodeLabels.add("java"); - nodeLabels.add("windows"); - nodeLabels.add("gpu"); - nodeLabels.add("x86"); + private HashSet getValidNodeLabels() { + HashSet nodeLabels = new HashSet(); + nodeLabels.add(NodeLabel.newInstance("java")); + nodeLabels.add(NodeLabel.newInstance("windows")); + nodeLabels.add(NodeLabel.newInstance("gpu")); + nodeLabels.add(NodeLabel.newInstance("x86")); return nodeLabels; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8046228da8f..b635c46021f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,9 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Random; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -279,11 +280,11 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); - Set nodeLabels = null; + Set nodeLabels = null; if (hasNodeLabelsProvider) { nodeLabels = nodeLabelsProvider.getNodeLabels(); nodeLabels = - (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET + (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels; } RegisterNodeManagerRequest request = @@ -628,29 +629,29 @@ protected void startStatusUpdater() { @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; - Set lastUpdatedNodeLabelsToRM = null; + Set lastUpdatedNodeLabelsToRM = null; if (hasNodeLabelsProvider) { lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels(); lastUpdatedNodeLabelsToRM = - (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET + (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : lastUpdatedNodeLabelsToRM; } while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; - Set nodeLabelsForHeartbeat = null; + Set nodeLabelsForHeartbeat = null; NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); if (hasNodeLabelsProvider) { nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels(); - //if the provider returns null then consider empty labels are set + // if the provider returns null then consider empty labels are set nodeLabelsForHeartbeat = - (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET + (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabelsForHeartbeat; if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat, lastUpdatedNodeLabelsToRM)) { - //if nodelabels have not changed then no need to send + // if nodelabels have not changed then no need to send nodeLabelsForHeartbeat = null; } } @@ -781,8 +782,8 @@ public void run() { * @param nodeLabelsOld * @return if the New node labels are diff from the older one. */ - private boolean areNodeLabelsUpdated(Set nodeLabelsNew, - Set nodeLabelsOld) { + private boolean areNodeLabelsUpdated(Set nodeLabelsNew, + Set nodeLabelsOld) { if (nodeLabelsNew.size() != nodeLabelsOld.size() || !nodeLabelsOld.containsAll(nodeLabelsNew)) { return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java index 4b34d763873..dab37091983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeLabel; /** * Interface which will be responsible for fetching the labels @@ -39,5 +40,5 @@ public NodeLabelsProvider(String name) { * * @return Set of node label strings applicable for a node */ - public abstract Set getNodeLabels(); + public abstract Set getNodeLabels(); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 437e4c89d75..a0ed39b6a9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -23,16 +23,17 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -71,7 +72,7 @@ public void tearDown() { private class ResourceTrackerForLabels implements ResourceTracker { int heartbeatID = 0; - Set labels; + Set labels; private boolean receivedNMHeartbeat = false; private boolean receivedNMRegister = false; @@ -185,18 +186,18 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public static class DummyNodeLabelsProvider extends NodeLabelsProvider { @SuppressWarnings("unchecked") - private Set nodeLabels = Collections.EMPTY_SET; + private Set nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET; public DummyNodeLabelsProvider() { super(DummyNodeLabelsProvider.class.getName()); } @Override - public synchronized Set getNodeLabels() { + public synchronized Set getNodeLabels() { return nodeLabels; } - synchronized void setNodeLabels(Set nodeLabels) { + synchronized void setNodeLabels(Set nodeLabels) { this.nodeLabels = nodeLabels; } } @@ -245,19 +246,21 @@ protected void stopRMProxy() { resourceTracker.resetNMHeartbeatReceiveFlag(); nm.start(); resourceTracker.waitTillRegister(); - assertCollectionEquals(resourceTracker.labels, - dummyLabelsProviderRef.getNodeLabels()); + assertNLCollectionEquals(resourceTracker.labels, + dummyLabelsProviderRef + .getNodeLabels()); resourceTracker.waitTillHeartbeat();// wait till the first heartbeat resourceTracker.resetNMHeartbeatReceiveFlag(); // heartbeat with updated labels - dummyLabelsProviderRef.setNodeLabels(toSet("P")); + dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P")); nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); - assertCollectionEquals(resourceTracker.labels, - dummyLabelsProviderRef.getNodeLabels()); + assertNLCollectionEquals(resourceTracker.labels, + dummyLabelsProviderRef + .getNodeLabels()); resourceTracker.resetNMHeartbeatReceiveFlag(); // heartbeat without updating labels diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 16b6a890ac9..4dc5c885a1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -247,6 +249,17 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { } } + static Set convertToStringSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (NodeLabel label : nodeLabels) { + labels.add(label.getName()); + } + return labels; + } + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( @@ -346,7 +359,7 @@ public RegisterNodeManagerResponse registerNodeManager( } // Update node's labels to RM's NodeLabelManager. - Set nodeLabels = request.getNodeLabels(); + Set nodeLabels = convertToStringSet(request.getNodeLabels()); if (isDistributedNodeLabelsConf && nodeLabels != null) { try { updateNodeLabelsFromNMReport(nodeLabels, nodeId); @@ -467,7 +480,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 5. Update node's labels to RM's NodeLabelManager. if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) { try { - updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId); + updateNodeLabelsFromNMReport( + convertToStringSet(request.getNodeLabels()), nodeId); nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true); } catch (IOException ex) { //ensure the error message is captured and sent across in response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index cc5f46462d2..3474ed65a2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -353,14 +354,14 @@ protected RMNodeLabelsManager createNodeLabelManager() { registerReq.setNodeId(nodeId); registerReq.setHttpPort(1234); registerReq.setNMVersion(YarnVersionInfo.getVersion()); - registerReq.setNodeLabels(toSet("A")); + registerReq.setNodeLabels(toSet(NodeLabel.newInstance("A"))); RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(registerReq); Assert.assertEquals("Action should be normal on valid Node Labels", NodeAction.NORMAL, response.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - registerReq.getNodeLabels()); + ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", response.getAreNodeLabelsAcceptedByRM()); rm.stop(); @@ -402,7 +403,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { registerReq.setNodeId(nodeId); registerReq.setHttpPort(1234); registerReq.setNMVersion(YarnVersionInfo.getVersion()); - registerReq.setNodeLabels(toSet("A", "B", "C")); + registerReq.setNodeLabels(toNodeLabelSet("A", "B", "C")); RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(registerReq); @@ -455,7 +456,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); - req.setNodeLabels(toSet("#Y")); + req.setNodeLabels(toNodeLabelSet("#Y")); RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); @@ -506,7 +507,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); - req.setNodeLabels(toSet("A")); + req.setNodeLabels(toNodeLabelSet("A")); RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); // registered to RM with central label config @@ -568,14 +569,14 @@ protected RMNodeLabelsManager createNodeLabelManager() { registerReq.setNodeId(nodeId); registerReq.setHttpPort(1234); registerReq.setNMVersion(YarnVersionInfo.getVersion()); - registerReq.setNodeLabels(toSet("A")); // Node register label + registerReq.setNodeLabels(toNodeLabelSet("A")); // Node register label RegisterNodeManagerResponse registerResponse = resourceTrackerService.registerNodeManager(registerReq); // modification of labels during heartbeat NodeHeartbeatRequest heartbeatReq = Records.newRecord(NodeHeartbeatRequest.class); - heartbeatReq.setNodeLabels(toSet("B")); // Node heartbeat label update + heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Node heartbeat label update NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); heartbeatReq.setNodeStatus(nodeStatusObject); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse @@ -588,7 +589,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals("InValid Node Labels were not accepted by RM", NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - heartbeatReq.getNodeLabels()); + ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); @@ -652,13 +653,13 @@ protected RMNodeLabelsManager createNodeLabelManager() { registerReq.setNodeId(nodeId); registerReq.setHttpPort(1234); registerReq.setNMVersion(YarnVersionInfo.getVersion()); - registerReq.setNodeLabels(toSet("A")); + registerReq.setNodeLabels(toNodeLabelSet("A")); RegisterNodeManagerResponse registerResponse = resourceTrackerService.registerNodeManager(registerReq); NodeHeartbeatRequest heartbeatReq = Records.newRecord(NodeHeartbeatRequest.class); - heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels + heartbeatReq.setNodeLabels(toNodeLabelSet("B", "#C")); // Invalid heart beat labels heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse .getNMTokenMasterKey()); @@ -705,13 +706,13 @@ protected RMNodeLabelsManager createNodeLabelManager() { req.setNodeId(nodeId); req.setHttpPort(1234); req.setNMVersion(YarnVersionInfo.getVersion()); - req.setNodeLabels(toSet("A", "B", "C")); + req.setNodeLabels(toNodeLabelSet("A", "B", "C")); RegisterNodeManagerResponse registerResponse = resourceTrackerService.registerNodeManager(req); NodeHeartbeatRequest heartbeatReq = Records.newRecord(NodeHeartbeatRequest.class); - heartbeatReq.setNodeLabels(toSet("B")); // Valid heart beat labels + heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Valid heart beat labels heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse .getNMTokenMasterKey());