YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use NodeLabel object instead of String. (Naganarasimha G R via wangda)

This commit is contained in:
Wangda Tan 2015-05-19 16:34:17 -07:00
parent 12d6c5ce4f
commit b37da52a1c
15 changed files with 149 additions and 74 deletions

View File

@ -247,6 +247,9 @@ Release 2.8.0 - UNRELEASED
YARN-3362. Add node label usage in RM CapacityScheduler web UI.
(Naganarasimha G R via wangda)
YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use
NodeLabel object instead of String. (Naganarasimha G R via wangda)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -253,10 +253,6 @@ message NodeIdToLabelsProto {
repeated string nodeLabels = 2;
}
message StringArrayProto {
repeated string elements = 1;
}
message LabelsToNodeIdsProto {
optional string nodeLabels = 1;
repeated NodeIdProto nodeId = 2;

View File

@ -39,6 +39,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -59,6 +60,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@Private
public class CommonNodeLabelsManager extends AbstractService {
protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class);
private static final int MAX_LABEL_LENGTH = 255;

View File

@ -112,6 +112,18 @@ public static <E> Set<E> toSet(E... elements) {
return set;
}
@SuppressWarnings("unchecked")
public static Set<NodeLabel> toNodeLabelSet(String... nodeLabelsStr) {
if (null == nodeLabelsStr) {
return null;
}
Set<NodeLabel> labels = new HashSet<NodeLabel>();
for (String label : nodeLabelsStr) {
labels.add(NodeLabel.newInstance(label));
}
return labels;
}
public NodeId toNodeId(String str) {
if (str.contains(":")) {
int idx = str.indexOf(':');

View File

@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@ -29,7 +30,7 @@ public abstract class NodeHeartbeatRequest {
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@ -50,8 +51,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
public abstract Set<NodeLabel> getNodeLabels();
public abstract void setNodeLabels(Set<NodeLabel> nodeLabels);
public abstract List<LogAggregationReport>
getLogAggregationReportsForApps();

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@ -39,7 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<String> nodeLabels) {
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@ -57,8 +58,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
public abstract Set<NodeLabel> getNodeLabels();
public abstract void setNodeLabels(Set<NodeLabel> nodeLabels);
/**
* We introduce this here because currently YARN RM doesn't persist nodes info

View File

@ -24,12 +24,16 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@ -45,7 +49,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatus nodeStatus = null;
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null;
private Set<NodeLabel> labels = null;
private List<LogAggregationReport> logAggregationReportsForApps = null;
public NodeHeartbeatRequestPBImpl() {
@ -93,8 +97,11 @@ private void mergeLocalToBuilder() {
}
if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
Builder newBuilder = NodeLabelsProto.newBuilder();
for (NodeLabel label : labels) {
newBuilder.addNodeLabels(convertToProtoFormat(label));
}
builder.setNodeLabels(newBuilder.build());
}
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
@ -238,13 +245,13 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) {
}
@Override
public Set<String> getNodeLabels() {
public Set<NodeLabel> getNodeLabels() {
initNodeLabels();
return this.labels;
}
@Override
public void setNodeLabels(Set<String> nodeLabels) {
public void setNodeLabels(Set<NodeLabel> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
@ -259,8 +266,19 @@ private void initNodeLabels() {
labels = null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
NodeLabelsProto nodeLabels = p.getNodeLabels();
labels = new HashSet<NodeLabel>();
for(NodeLabelProto nlp : nodeLabels.getNodeLabelsList()) {
labels.add(convertFromProtoFormat(nlp));
}
}
private NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) {
return new NodeLabelPBImpl(p);
}
private NodeLabelProto convertToProtoFormat(NodeLabel t) {
return ((NodeLabelPBImpl)t).getProto();
}
@Override

View File

@ -27,16 +27,19 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -51,7 +54,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
private Set<String> labels = null;
private Set<NodeLabel> labels = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@ -84,8 +87,11 @@ private void mergeLocalToBuilder() {
}
if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
Builder newBuilder = NodeLabelsProto.newBuilder();
for (NodeLabel label : labels) {
newBuilder.addNodeLabels(convertToProtoFormat(label));
}
builder.setNodeLabels(newBuilder.build());
}
}
@ -293,13 +299,13 @@ public void setNMVersion(String version) {
}
@Override
public Set<String> getNodeLabels() {
public Set<NodeLabel> getNodeLabels() {
initNodeLabels();
return this.labels;
}
@Override
public void setNodeLabels(Set<String> nodeLabels) {
public void setNodeLabels(Set<NodeLabel> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
@ -314,8 +320,19 @@ private void initNodeLabels() {
labels=null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
NodeLabelsProto nodeLabels = p.getNodeLabels();
labels = new HashSet<NodeLabel>();
for(NodeLabelProto nlp : nodeLabels.getNodeLabelsList()) {
labels.add(convertFromProtoFormat(nlp));
}
}
private NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) {
return new NodeLabelPBImpl(p);
}
private NodeLabelProto convertToProtoFormat(NodeLabel t) {
return ((NodeLabelPBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {

View File

@ -25,6 +25,10 @@ package hadoop.yarn;
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
message RegisterNodeManagerRequestProto {
optional NodeIdProto node_id = 1;
optional int32 http_port = 3;
@ -32,7 +36,7 @@ message RegisterNodeManagerRequestProto {
optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7;
optional StringArrayProto nodeLabels = 8;
optional NodeLabelsProto nodeLabels = 8;
}
message RegisterNodeManagerResponseProto {
@ -49,7 +53,7 @@ message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
optional NodeLabelsProto nodeLabels = 4;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
@ -113,7 +114,7 @@ public void testNodeHeartbeatRequestPBImpl() {
Assert.assertTrue(original.getNodeLabels()
.containsAll(copy.getNodeLabels()));
// check for empty labels
original.setNodeLabels(new HashSet<String> ());
original.setNodeLabels(new HashSet<NodeLabel> ());
copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
Assert.assertNotNull(copy.getNodeLabels());
@ -271,7 +272,7 @@ public void testRegisterNodeManagerRequestWithNullLabels() {
@Test
public void testRegisterNodeManagerRequestWithValidLabels() {
HashSet<String> nodeLabels = getValidNodeLabels();
HashSet<NodeLabel> nodeLabels = getValidNodeLabels();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
@ -286,19 +287,19 @@ public void testRegisterNodeManagerRequestWithValidLabels() {
Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels()));
// check for empty labels
request.setNodeLabels(new HashSet<String> ());
request.setNodeLabels(new HashSet<NodeLabel> ());
copy = new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
Assert.assertNotNull(copy.getNodeLabels());
Assert.assertEquals(0, copy.getNodeLabels().size());
}
private HashSet<String> getValidNodeLabels() {
HashSet<String> nodeLabels = new HashSet<String>();
nodeLabels.add("java");
nodeLabels.add("windows");
nodeLabels.add("gpu");
nodeLabels.add("x86");
private HashSet<NodeLabel> getValidNodeLabels() {
HashSet<NodeLabel> nodeLabels = new HashSet<NodeLabel>();
nodeLabels.add(NodeLabel.newInstance("java"));
nodeLabels.add(NodeLabel.newInstance("windows"));
nodeLabels.add(NodeLabel.newInstance("gpu"));
nodeLabels.add(NodeLabel.newInstance("x86"));
return nodeLabels;
}

View File

@ -30,9 +30,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -279,11 +280,11 @@ protected ResourceTracker getRMClient() throws IOException {
protected void registerWithRM()
throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
Set<String> nodeLabels = null;
Set<NodeLabel> nodeLabels = null;
if (hasNodeLabelsProvider) {
nodeLabels = nodeLabelsProvider.getNodeLabels();
nodeLabels =
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabels;
}
RegisterNodeManagerRequest request =
@ -628,29 +629,29 @@ protected void startStatusUpdater() {
@SuppressWarnings("unchecked")
public void run() {
int lastHeartbeatID = 0;
Set<String> lastUpdatedNodeLabelsToRM = null;
Set<NodeLabel> lastUpdatedNodeLabelsToRM = null;
if (hasNodeLabelsProvider) {
lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
lastUpdatedNodeLabelsToRM =
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: lastUpdatedNodeLabelsToRM;
}
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<String> nodeLabelsForHeartbeat = null;
Set<NodeLabel> nodeLabelsForHeartbeat = null;
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
if (hasNodeLabelsProvider) {
nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
//if the provider returns null then consider empty labels are set
// if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat =
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabelsForHeartbeat;
if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
lastUpdatedNodeLabelsToRM)) {
//if nodelabels have not changed then no need to send
// if nodelabels have not changed then no need to send
nodeLabelsForHeartbeat = null;
}
}
@ -781,8 +782,8 @@ public void run() {
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
*/
private boolean areNodeLabelsUpdated(Set<String> nodeLabelsNew,
Set<String> nodeLabelsOld) {
private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
Set<NodeLabel> nodeLabelsOld) {
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
return true;

View File

@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeLabel;
/**
* Interface which will be responsible for fetching the labels
@ -39,5 +40,5 @@ public NodeLabelsProvider(String name) {
*
* @return Set of node label strings applicable for a node
*/
public abstract Set<String> getNodeLabels();
public abstract Set<NodeLabel> getNodeLabels();
}

View File

@ -23,16 +23,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -71,7 +72,7 @@ public void tearDown() {
private class ResourceTrackerForLabels implements ResourceTracker {
int heartbeatID = 0;
Set<String> labels;
Set<NodeLabel> labels;
private boolean receivedNMHeartbeat = false;
private boolean receivedNMRegister = false;
@ -185,18 +186,18 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
@SuppressWarnings("unchecked")
private Set<String> nodeLabels = Collections.EMPTY_SET;
private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
public DummyNodeLabelsProvider() {
super(DummyNodeLabelsProvider.class.getName());
}
@Override
public synchronized Set<String> getNodeLabels() {
public synchronized Set<NodeLabel> getNodeLabels() {
return nodeLabels;
}
synchronized void setNodeLabels(Set<String> nodeLabels) {
synchronized void setNodeLabels(Set<NodeLabel> nodeLabels) {
this.nodeLabels = nodeLabels;
}
}
@ -245,19 +246,21 @@ protected void stopRMProxy() {
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef.getNodeLabels());
assertNLCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef
.getNodeLabels());
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with updated labels
dummyLabelsProviderRef.setNodeLabels(toSet("P"));
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef.getNodeLabels());
assertNLCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef
.getNodeLabels());
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat without updating labels

View File

@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -247,6 +249,17 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
}
}
static Set<String> convertToStringSet(Set<NodeLabel> nodeLabels) {
if (null == nodeLabels) {
return null;
}
Set<String> labels = new HashSet<String>();
for (NodeLabel label : nodeLabels) {
labels.add(label.getName());
}
return labels;
}
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
@ -346,7 +359,7 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Update node's labels to RM's NodeLabelManager.
Set<String> nodeLabels = request.getNodeLabels();
Set<String> nodeLabels = convertToStringSet(request.getNodeLabels());
if (isDistributedNodeLabelsConf && nodeLabels != null) {
try {
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
@ -467,7 +480,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 5. Update node's labels to RM's NodeLabelManager.
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
updateNodeLabelsFromNMReport(
convertToStringSet(request.getNodeLabels()), nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -353,14 +354,14 @@ protected RMNodeLabelsManager createNodeLabelManager() {
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A"));
registerReq.setNodeLabels(toSet(NodeLabel.newInstance("A")));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(registerReq);
Assert.assertEquals("Action should be normal on valid Node Labels",
NodeAction.NORMAL, response.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
registerReq.getNodeLabels());
ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
response.getAreNodeLabelsAcceptedByRM());
rm.stop();
@ -402,7 +403,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A", "B", "C"));
registerReq.setNodeLabels(toNodeLabelSet("A", "B", "C"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(registerReq);
@ -455,7 +456,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("#Y"));
req.setNodeLabels(toNodeLabelSet("#Y"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
@ -506,7 +507,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("A"));
req.setNodeLabels(toNodeLabelSet("A"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
// registered to RM with central label config
@ -568,14 +569,14 @@ protected RMNodeLabelsManager createNodeLabelManager() {
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A")); // Node register label
registerReq.setNodeLabels(toNodeLabelSet("A")); // Node register label
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
// modification of labels during heartbeat
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B")); // Node heartbeat label update
heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Node heartbeat label update
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
@ -588,7 +589,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertEquals("InValid Node Labels were not accepted by RM",
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
heartbeatReq.getNodeLabels());
ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
@ -652,13 +653,13 @@ protected RMNodeLabelsManager createNodeLabelManager() {
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A"));
registerReq.setNodeLabels(toNodeLabelSet("A"));
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels
heartbeatReq.setNodeLabels(toNodeLabelSet("B", "#C")); // Invalid heart beat labels
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
@ -705,13 +706,13 @@ protected RMNodeLabelsManager createNodeLabelManager() {
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("A", "B", "C"));
req.setNodeLabels(toNodeLabelSet("A", "B", "C"));
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(req);
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B")); // Valid heart beat labels
heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Valid heart beat labels
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());