From f659485ee83f3f34e3717631983adfc8fa1e53dc Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Fri, 21 Dec 2018 10:56:42 +0800 Subject: [PATCH] YARN-8925. Updating distributed node attributes only when necessary. Contributed by Tao Yang. --- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../hadoop/yarn/nodelabels/NodeLabelUtil.java | 51 +- .../src/main/resources/yarn-default.xml | 9 + .../yarn/nodelabels/TestNodeLabelUtil.java | 71 +++ .../NodeHeartbeatResponse.java | 5 + .../RegisterNodeManagerRequest.java | 16 + .../RegisterNodeManagerResponse.java | 5 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 15 + .../pb/RegisterNodeManagerRequestPBImpl.java | 53 ++ .../pb/RegisterNodeManagerResponsePBImpl.java | 15 + .../yarn_server_common_service_protos.proto | 3 + .../nodemanager/NodeStatusUpdaterImpl.java | 331 ++++++++++--- .../TestNodeStatusUpdaterForAttributes.java | 439 +++++++++++++++++ .../ResourceTrackerService.java | 103 +++- .../nodelabels/NodeAttributesManagerImpl.java | 13 +- .../TestResourceTrackerService.java | 454 +++++++++++++++++- 16 files changed, 1477 insertions(+), 112 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d84d11acbca..3bc9957020f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3669,6 +3669,12 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL = 2 * 60 * 1000; + public static final String NM_NODE_ATTRIBUTES_RESYNC_INTERVAL = + NM_NODE_ATTRIBUTES_PREFIX + "resync-interval-ms"; + + public static final long DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL = + 2 * 60 * 1000; + // If -1 is configured then no timer task should be created public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java index 395ff8183ce..c313998598b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeKey; import java.io.IOException; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -40,6 +41,8 @@ public final class NodeLabelUtil { Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_\\.]*"); private static final Pattern ATTRIBUTE_VALUE_PATTERN = Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_.]*"); + private static final Pattern ATTRIBUTE_NAME_PATTERN = + Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*"); public static void checkAndThrowLabelName(String label) throws IOException { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { @@ -57,6 +60,25 @@ public final class NodeLabelUtil { } } + public static void checkAndThrowAttributeName(String attributeName) + throws IOException { + if (attributeName == null || attributeName.isEmpty() + || attributeName.length() > MAX_LABEL_LENGTH) { + throw new IOException( + "attribute name added is empty or exceeds " + MAX_LABEL_LENGTH + + " character(s)"); + } + attributeName = attributeName.trim(); + + boolean match = ATTRIBUTE_NAME_PATTERN.matcher(attributeName).matches(); + + if (!match) { + throw new IOException("attribute name should only contains " + + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}" + + ", now it is= " + attributeName); + } + } + public static void checkAndThrowAttributeValue(String value) throws IOException { if (value == null) { @@ -129,7 +151,9 @@ public final class NodeLabelUtil { // Verify attribute prefix format. checkAndThrowAttributePrefix(prefix); // Verify attribute name format. - checkAndThrowLabelName(attributeKey.getAttributeName()); + checkAndThrowAttributeName(attributeKey.getAttributeName()); + // Verify attribute value format. + checkAndThrowAttributeValue(nodeAttribute.getAttributeValue()); } } } @@ -152,4 +176,29 @@ public final class NodeLabelUtil { .equals(nodeAttribute.getAttributeKey().getAttributePrefix())) .collect(Collectors.toSet()); } + + /** + * Are these two input node attributes the same. + * @return true if they are the same + */ + public static boolean isNodeAttributesEquals( + Set leftNodeAttributes, + Set rightNodeAttributes) { + if (leftNodeAttributes == null && rightNodeAttributes == null) { + return true; + } else if (leftNodeAttributes == null || rightNodeAttributes == null + || leftNodeAttributes.size() != rightNodeAttributes.size()) { + return false; + } + return leftNodeAttributes.stream() + .allMatch(e -> isNodeAttributeIncludes(rightNodeAttributes, e)); + } + + private static boolean isNodeAttributeIncludes( + Set nodeAttributes, NodeAttribute checkNodeAttribute) { + return nodeAttributes.stream().anyMatch( + e -> e.equals(checkNodeAttribute) && Objects + .equals(e.getAttributeValue(), + checkNodeAttribute.getAttributeValue())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 12c5ff6cc40..ff64c90e8d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3005,6 +3005,15 @@ + + + Interval at which NM syncs its node attributes with RM. NM will send its loaded + attributes every x intervals configured, along with heartbeat to RM. + + yarn.nodemanager.node-attributes.resync-interval-ms + 120000 + + Timeout in seconds for YARN node graceful decommission. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java index afdfcbbb5f6..060e38d8884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.nodelabels; import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.junit.Assert; import org.junit.Test; /** @@ -48,4 +53,70 @@ public class TestNodeLabelUtil { } } } + + @Test + public void testIsNodeAttributesEquals() { + NodeAttribute nodeAttributeCK1V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeCK1V1Copy = NodeAttribute + .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK1V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK1V1Copy = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK2V1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttributeDK2V2 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2", + NodeAttributeType.STRING, "V2"); + /* + * equals if set size equals and items are all the same + */ + Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(null, null)); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(), ImmutableSet.of())); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeCK1V1Copy))); + Assert.assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK1V1Copy))); + Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals( + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeCK1V1Copy, nodeAttributeDK1V1Copy))); + /* + * not equals if set size not equals or items are different + */ + Assert.assertFalse( + NodeLabelUtil.isNodeAttributesEquals(null, ImmutableSet.of())); + Assert.assertFalse( + NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(), null)); + // different attribute prefix + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeDK1V1))); + // different attribute name + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK2V1))); + // different attribute value + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK2V1), + ImmutableSet.of(nodeAttributeDK2V2))); + // different set + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of())); + Assert.assertFalse(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1), + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1))); + Assert.assertFalse(NodeLabelUtil.isNodeAttributesEquals( + ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1), + ImmutableSet.of(nodeAttributeDK1V1))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 05a9c721e15..acb76440b6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -118,4 +118,9 @@ public abstract class NodeHeartbeatResponse { public abstract void addAllContainersToDecrease( Collection containersToDecrease); + + public abstract boolean getAreNodeAttributesAcceptedByRM(); + + public abstract void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index ff50330df31..acec16fd56b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; @@ -50,6 +51,16 @@ public abstract class RegisterNodeManagerRequest { List containerStatuses, List runningApplications, Set nodeLabels, Resource physicalResource) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, nodeLabels, physicalResource, + null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications, Set nodeLabels, + Resource physicalResource, Set nodeAttributes) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -60,6 +71,7 @@ public abstract class RegisterNodeManagerRequest { request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); + request.setNodeAttributes(nodeAttributes); return request; } @@ -117,4 +129,8 @@ public abstract class RegisterNodeManagerRequest { public abstract void setLogAggregationReportsForApps( List logAggregationReportsForApps); + + public abstract Set getNodeAttributes(); + + public abstract void setNodeAttributes(Set nodeAttributes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 675b375c1d6..d54a5672675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -58,4 +58,9 @@ public abstract class RegisterNodeManagerResponse { public abstract void setAreNodeLabelsAcceptedByRM( boolean areNodeLabelsAcceptedByRM); + + public abstract boolean getAreNodeAttributesAcceptedByRM(); + + public abstract void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 9af5bfc4aec..a53ea4d208b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -787,6 +787,21 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + @Override + public boolean getAreNodeAttributesAcceptedByRM() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getAreNodeAttributesAcceptedByRM(); + } + + @Override + public void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM) { + maybeInitBuilder(); + this.builder + .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM); + } + @Override public List getContainersToSignalList() { initContainersToSignal(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 02fd20f88c3..317f8abd6f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -26,22 +26,26 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -58,6 +62,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private List containerStatuses = null; private List runningApplications = null; private Set labels = null; + private Set attributes = null; private List logAggregationReportsForApps = null; @@ -101,6 +106,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } builder.setNodeLabels(newBuilder.build()); } + if (this.attributes != null) { + builder.clearNodeAttributes(); + NodeAttributesProto.Builder attributesBuilder = + NodeAttributesProto.newBuilder(); + for (NodeAttribute attribute : attributes) { + attributesBuilder.addNodeAttributes(convertToProtoFormat(attribute)); + } + builder.setNodeAttributes(attributesBuilder.build()); + } if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } @@ -404,6 +418,36 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest } } + @Override + public synchronized Set getNodeAttributes() { + initNodeAttributes(); + return this.attributes; + } + + @Override + public synchronized void setNodeAttributes( + Set nodeAttributes) { + maybeInitBuilder(); + builder.clearNodeAttributes(); + this.attributes = nodeAttributes; + } + + private synchronized void initNodeAttributes() { + if (this.attributes != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeAttributes()) { + attributes=null; + return; + } + NodeAttributesProto nodeAttributes = p.getNodeAttributes(); + attributes = new HashSet<>(); + for(NodeAttributeProto nap : nodeAttributes.getNodeAttributesList()) { + attributes.add(convertFromProtoFormat(nap)); + } + } + private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { return new NodeLabelPBImpl(p); } @@ -412,6 +456,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest return ((NodeLabelPBImpl)t).getProto(); } + private static NodeAttributePBImpl convertFromProtoFormat( + NodeAttributeProto p) { + return new NodeAttributePBImpl(p); + } + + private static NodeAttributeProto convertToProtoFormat(NodeAttribute t) { + return ((NodeAttributePBImpl)t).getProto(); + } + private static ApplicationIdPBImpl convertFromProtoFormat( ApplicationIdProto p) { return new ApplicationIdPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index 63213090d45..4e4ca3c3db4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -269,4 +269,19 @@ public class RegisterNodeManagerResponsePBImpl maybeInitBuilder(); this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + + @Override + public boolean getAreNodeAttributesAcceptedByRM() { + RegisterNodeManagerResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getAreNodeAttributesAcceptedByRM(); + } + + @Override + public void setAreNodeAttributesAcceptedByRM( + boolean areNodeAttributesAcceptedByRM) { + maybeInitBuilder(); + this.builder + .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0b8c4a384d9..fb74237b5b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -72,6 +72,7 @@ message RegisterNodeManagerRequestProto { optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; + optional NodeAttributesProto nodeAttributes = 11; } message RegisterNodeManagerResponseProto { @@ -83,6 +84,7 @@ message RegisterNodeManagerResponseProto { optional string rm_version = 6; optional bool areNodeLabelsAcceptedByRM = 7 [default = false]; optional ResourceProto resource = 8; + optional bool areNodeAttributesAcceptedByRM = 9 [default = false]; } message UnRegisterNodeManagerRequestProto { @@ -128,6 +130,7 @@ message NodeHeartbeatResponseProto { repeated AppCollectorDataProto app_collectors = 16; // to be used in place of containers_to_decrease repeated ContainerProto containers_to_update = 17; + optional bool areNodeAttributesAcceptedByRM = 18 [default = false]; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3bb9f92c277..5975784de5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -377,6 +378,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements throws YarnException, IOException { RegisterNodeManagerResponse regNMResponse; Set nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); + Set nodeAttributes = + nodeAttributesHandler.getNodeAttributesForRegistration(); // Synchronize NM-RM registration with // ContainerManagerImpl#increaseContainersResource and @@ -387,7 +390,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource); + nodeLabels, physicalResource, nodeAttributes); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); @@ -473,6 +476,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements successfullRegistrationMsg.append(nodeLabelsHandler .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); + successfullRegistrationMsg.append(nodeAttributesHandler + .verifyRMRegistrationResponseForNodeAttributes(regNMResponse)); LOG.info(successfullRegistrationMsg.toString()); } @@ -875,34 +880,254 @@ public class NodeStatusUpdaterImpl extends AbstractService implements */ private NMNodeAttributesHandler createNMNodeAttributesHandler( NodeAttributesProvider provider) { - return provider == null ? null : - new NMDistributedNodeAttributesHandler(nodeAttributesProvider); + if (provider == null) { + return new NMCentralizedNodeAttributesHandler(); + } else { + return new NMDistributedNodeAttributesHandler(provider, this.getConfig()); + } + } + + private static abstract class CachedNodeDescriptorHandler { + private final long resyncInterval; + private final T defaultValue; + private T previousValue; + private long lastSendMills = 0L; + private boolean isValueSented; + + CachedNodeDescriptorHandler(T defaultValue, + long resyncInterval) { + this.defaultValue = defaultValue; + this.resyncInterval = resyncInterval; + } + + public abstract T getValueFromProvider(); + + public T getValueForRegistration() { + T value = getValueFromProvider(); + if (defaultValue != null) { + value = (null == value) ? defaultValue : value; + } + previousValue = value; + try { + validate(value); + } catch (IOException e) { + value = null; + } + return value; + } + + public T getValueForHeartbeat() { + T value = getValueFromProvider(); + // if the provider returns null then consider default value are set + if (defaultValue != null) { + value = (null == value) ? defaultValue : value; + } + // take some action only on modification of value + boolean isValueUpdated = isValueUpdated(value); + + isValueSented = false; + // When value updated or resync time is elapsed will send again in + // heartbeat. + if (isValueUpdated || isResyncIntervalElapsed()) { + previousValue = value; + try { + validate(value); + isValueSented = true; + } catch (IOException e) { + // take previous value to replace invalid value, so that invalid + // value are not verified for every HB, and send empty set + // to RM to have same value which was earlier set. + value = null; + } finally { + // Set last send time in heartbeat + lastSendMills = System.currentTimeMillis(); + } + } else { + // if value have not changed then no need to send + value = null; + } + return value; + } + + /** + * This method checks resync interval is elapsed or not. + */ + public boolean isResyncIntervalElapsed() { + long elapsedTimeSinceLastSync = + System.currentTimeMillis() - lastSendMills; + if (elapsedTimeSinceLastSync > resyncInterval) { + return true; + } + return false; + } + + protected abstract void validate(T value) throws IOException; + + protected abstract boolean isValueUpdated(T value); + + public long getResyncInterval() { + return resyncInterval; + } + + public T getDefaultValue() { + return defaultValue; + } + + public T getPreviousValue() { + return previousValue; + } + + public long getLastSendMills() { + return lastSendMills; + } + + public boolean isValueSented() { + return isValueSented; + } } private interface NMNodeAttributesHandler { + /** + * validates nodeAttributes From Provider and returns it to the caller. Also + * ensures that if provider returns null then empty set is considered + */ + Set getNodeAttributesForRegistration(); + /** * @return the node attributes of this node manager. */ Set getNodeAttributesForHeartbeat(); + + /** + * @return RMRegistration Success message and on failure will log + * independently and returns empty string + */ + String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse); + + /** + * check whether if updated attributes sent to RM was accepted or not. + * @param response + */ + void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response); + } + + + /** + * In centralized configuration, NM need not send Node attributes or process + * the response. + */ + private static class NMCentralizedNodeAttributesHandler + implements NMNodeAttributesHandler { + @Override + public Set getNodeAttributesForHeartbeat() { + return null; + } + + @Override + public Set getNodeAttributesForRegistration() { + return null; + } + + @Override + public void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response) { + } + + @Override + public String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse) { + return ""; + } } private static class NMDistributedNodeAttributesHandler + extends CachedNodeDescriptorHandler> implements NMNodeAttributesHandler { private final NodeAttributesProvider attributesProvider; protected NMDistributedNodeAttributesHandler( - NodeAttributesProvider provider) { + NodeAttributesProvider provider, Configuration conf) { + super(Collections.unmodifiableSet(new HashSet<>(0)), + conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, + YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL)); this.attributesProvider = provider; } + @Override + public Set getNodeAttributesForRegistration() { + return getValueForRegistration(); + } + @Override public Set getNodeAttributesForHeartbeat() { + return getValueForHeartbeat(); + } + + @Override + public Set getValueFromProvider() { return attributesProvider.getDescriptors(); } - } + @Override + protected void validate(Set nodeAttributes) + throws IOException { + try { + NodeLabelUtil.validateNodeAttributes(nodeAttributes); + } catch (IOException e) { + LOG.error( + "Invalid node attribute(s) from Provider : " + e.getMessage()); + throw e; + } + } + + @Override + protected boolean isValueUpdated(Set value) { + return !NodeLabelUtil.isNodeAttributesEquals(getPreviousValue(), value); + } + + @Override + public String verifyRMRegistrationResponseForNodeAttributes( + RegisterNodeManagerResponse regNMResponse) { + StringBuilder successfulNodeAttributesRegistrationMsg = + new StringBuilder(); + if (regNMResponse.getAreNodeAttributesAcceptedByRM()) { + successfulNodeAttributesRegistrationMsg + .append(" and with following Node attribute(s) : {") + .append(getPreviousValue()).append("}"); + } else { + // case where provider is set but RM did not accept the node attributes + String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); + LOG.error("Node attributes sent from NM while registration were" + + " rejected by RM. " + ((errorMsgFromRM == null) ? + "Seems like RM is configured with Centralized Attributes." : + "And with message " + regNMResponse.getDiagnosticsMessage())); + } + return successfulNodeAttributesRegistrationMsg.toString(); + } + + @Override + public void verifyRMHeartbeatResponseForNodeAttributes( + NodeHeartbeatResponse response) { + if (isValueSented()) { + if (response.getAreNodeAttributesAcceptedByRM()) { + if(LOG.isDebugEnabled()){ + LOG.debug("Node attributes {" + getPreviousValue() + + "} were Accepted by RM "); + } + } else { + // case where updated node attributes from NodeAttributesProvider + // is sent to RM and RM rejected the attributes + LOG.error("NM node attributes {" + getPreviousValue() + + "} were not accepted by RM and message from RM : " + response + .getDiagnosticsMessage()); + } + } + } + } private static interface NMNodeLabelsHandler { /** @@ -963,33 +1188,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } private static class NMDistributedNodeLabelsHandler + extends CachedNodeDescriptorHandler> implements NMNodeLabelsHandler { + private NMDistributedNodeLabelsHandler( NodeLabelsProvider nodeLabelsProvider, Configuration conf) { - this.nodeLabelsProvider = nodeLabelsProvider; - this.resyncInterval = + super(CommonNodeLabelsManager.EMPTY_NODELABEL_SET, conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, - YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL); + YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL)); + this.nodeLabelsProvider = nodeLabelsProvider; } private final NodeLabelsProvider nodeLabelsProvider; - private Set previousNodeLabels; - private boolean areLabelsSentToRM; - private long lastNodeLabelSendMills = 0L; - private final long resyncInterval; @Override public Set getNodeLabelsForRegistration() { - Set nodeLabels = nodeLabelsProvider.getDescriptors(); - nodeLabels = (null == nodeLabels) - ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels; - previousNodeLabels = nodeLabels; - try { - validateNodeLabels(nodeLabels); - } catch (IOException e) { - nodeLabels = null; - } - return nodeLabels; + return getValueForRegistration(); } @Override @@ -999,7 +1213,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { successfulNodeLabelsRegistrationMsg .append(" and with following Node label(s) : {") - .append(StringUtils.join(",", previousNodeLabels)).append("}"); + .append(StringUtils.join(",", getPreviousValue())).append("}"); } else { // case where provider is set but RM did not accept the Node Labels String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); @@ -1014,50 +1228,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override public Set getNodeLabelsForHeartbeat() { - Set nodeLabelsForHeartbeat = - nodeLabelsProvider.getDescriptors(); - // if the provider returns null then consider empty labels are set - nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null) - ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET - : nodeLabelsForHeartbeat; - // take some action only on modification of labels - boolean areNodeLabelsUpdated = - nodeLabelsForHeartbeat.size() != previousNodeLabels.size() - || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); - - areLabelsSentToRM = false; - // When nodelabels elapsed or resync time is elapsed will send again in - // heartbeat. - if (areNodeLabelsUpdated || isResyncIntervalElapsed()) { - previousNodeLabels = nodeLabelsForHeartbeat; - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Labels from provider: " - + StringUtils.join(",", previousNodeLabels)); - } - validateNodeLabels(nodeLabelsForHeartbeat); - areLabelsSentToRM = true; - } catch (IOException e) { - // set previous node labels to invalid set, so that invalid - // labels are not verified for every HB, and send empty set - // to RM to have same nodeLabels which was earlier set. - nodeLabelsForHeartbeat = null; - } finally { - // Set last send time in heartbeat - lastNodeLabelSendMills = System.currentTimeMillis(); - } - } else { - // if nodelabels have not changed then no need to send - nodeLabelsForHeartbeat = null; - } - return nodeLabelsForHeartbeat; + return getValueForHeartbeat(); } - private void validateNodeLabels(Set nodeLabelsForHeartbeat) + protected void validate(Set nodeLabels) throws IOException { - Iterator iterator = nodeLabelsForHeartbeat.iterator(); + Iterator iterator = nodeLabels.iterator(); boolean hasInvalidLabel = false; - StringBuilder errorMsg = new StringBuilder(""); + StringBuilder errorMsg = new StringBuilder(); while (iterator.hasNext()) { try { NodeLabelUtil @@ -1074,33 +1252,31 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - /* - * This method checks resync interval is elapsed or not. - */ - public boolean isResyncIntervalElapsed() { - long elapsedTimeSinceLastSync = - System.currentTimeMillis() - lastNodeLabelSendMills; - if (elapsedTimeSinceLastSync > resyncInterval) { - return true; - } - return false; + @Override + public Set getValueFromProvider() { + return this.nodeLabelsProvider.getDescriptors(); + } + + @Override + protected boolean isValueUpdated(Set value) { + return !Objects.equals(value, getPreviousValue()); } @Override public void verifyRMHeartbeatResponseForNodeLabels( NodeHeartbeatResponse response) { - if (areLabelsSentToRM) { + if (isValueSented()) { if (response.getAreNodeLabelsAcceptedByRM()) { if(LOG.isDebugEnabled()){ LOG.debug( - "Node Labels {" + StringUtils.join(",", previousNodeLabels) + "Node Labels {" + StringUtils.join(",", getPreviousValue()) + "} were Accepted by RM "); } } else { // case where updated labels from NodeLabelsProvider is sent to RM and // RM rejected the labels LOG.error( - "NM node labels {" + StringUtils.join(",", previousNodeLabels) + "NM node labels {" + StringUtils.join(",", getPreviousValue()) + "} were not accepted by RM and message from RM : " + response.getDiagnosticsMessage()); } @@ -1120,7 +1296,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); Set nodeAttributesForHeartbeat = - nodeAttributesHandler == null ? null : nodeAttributesHandler.getNodeAttributesForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = @@ -1153,6 +1328,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (!handleShutdownOrResyncCommand(response)) { nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( response); + nodeAttributesHandler + .verifyRMHeartbeatResponseForNodeAttributes(response); // Explicitly put this method after checking the resync // response. We diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java new file mode 100644 index 00000000000..325d60c59be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.Thread.State; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.ServerSocketUtil; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test NodeStatusUpdater for node attributes. + */ +public class TestNodeStatusUpdaterForAttributes extends NodeLabelTestBase { + private static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + private NodeManager nm; + private DummyNodeAttributesProvider dummyAttributesProviderRef; + + @Before + public void setup() { + dummyAttributesProviderRef = new DummyNodeAttributesProvider(); + } + + @After + public void tearDown() { + if (null != nm) { + ServiceOperations.stop(nm); + } + } + + private class ResourceTrackerForAttributes implements ResourceTracker { + private int heartbeatID = 0; + private Set attributes; + + private boolean receivedNMHeartbeat = false; + private boolean receivedNMRegister = false; + + private MasterKey createMasterKey() { + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes( + ByteBuffer.wrap(new byte[] {new Integer(123).byteValue() })); + return masterKey; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + attributes = request.getNodeAttributes(); + RegisterNodeManagerResponse response = + RECORD_FACTORY.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(NodeAction.NORMAL); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + response.setAreNodeAttributesAcceptedByRM(attributes != null); + synchronized (ResourceTrackerForAttributes.class) { + receivedNMRegister = true; + ResourceTrackerForAttributes.class.notifyAll(); + } + return response; + } + + public void waitTillHeartbeat() + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> receivedNMHeartbeat, 100, 30000); + if (!receivedNMHeartbeat) { + Assert.fail("Heartbeat is not received even after waiting"); + } + } + + public void waitTillRegister() + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> receivedNMRegister, 100, 30000); + if (!receivedNMRegister) { + Assert.fail("Registration is not received even after waiting"); + } + } + + /** + * Flag to indicate received any. + */ + public void resetNMHeartbeatReceiveFlag() { + synchronized (ResourceTrackerForAttributes.class) { + receivedNMHeartbeat = false; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat( + NodeHeartbeatRequest request) { + attributes = request.getNodeAttributes(); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartbeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils + .newNodeHeartbeatResponse(heartbeatID, NodeAction.NORMAL, null, null, + null, null, 1000L); + + // to ensure that heartbeats are sent only when required. + nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE); + nhResponse.setAreNodeAttributesAcceptedByRM(attributes != null); + + synchronized (ResourceTrackerForAttributes.class) { + receivedNMHeartbeat = true; + ResourceTrackerForAttributes.class.notifyAll(); + } + return nhResponse; + } + + @Override + public UnRegisterNodeManagerResponse unRegisterNodeManager( + UnRegisterNodeManagerRequest request) { + return null; + } + } + + /** + * A dummy NodeAttributesProvider class for tests. + */ + public static class DummyNodeAttributesProvider + extends NodeAttributesProvider { + + public DummyNodeAttributesProvider() { + super("DummyNodeAttributesProvider"); + // disable the fetch timer. + setIntervalTime(-1); + } + + @Override + protected void cleanUp() throws Exception { + // fake implementation, nothing to cleanup + } + + @Override + public TimerTask createTimerTask() { + return new TimerTask() { + @Override + public void run() { + setDescriptors(Collections.unmodifiableSet(new HashSet<>(0))); + } + }; + } + } + + private YarnConfiguration createNMConfigForDistributeNodeAttributes() { + YarnConfiguration conf = new YarnConfiguration(); + return conf; + } + + @Test(timeout = 20000) + public void testNodeStatusUpdaterForNodeAttributes() + throws InterruptedException, IOException, TimeoutException { + final ResourceTrackerForAttributes resourceTracker = + new ResourceTrackerForAttributes(); + nm = new NodeManager() { + @Override + protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + return dummyAttributesProviderRef; + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater( + Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfigForDistributeNodeAttributes(); + conf.setLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, 2000); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, + "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + + resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with updated attributes + NodeAttribute attribute1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat without updating attributes + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + resourceTracker.resetNMHeartbeatReceiveFlag(); + assertNull("If no change in attributes" + + " then null should be sent as part of request", + resourceTracker.attributes); + + // provider return with null attributes + dummyAttributesProviderRef.setDescriptors(null); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNotNull("If provider sends null" + + " then empty label set should be sent and not null", + resourceTracker.attributes); + assertTrue("If provider sends null then empty attributes should be sent", + resourceTracker.attributes.isEmpty()); + resourceTracker.resetNMHeartbeatReceiveFlag(); + // Since the resync interval is set to 2 sec in every alternate heartbeat + // the attributes will be send along with heartbeat. + // In loop we sleep for 1 sec + // so that every sec 1 heartbeat is send. + int nullAttributes = 0; + int nonNullAttributes = 0; + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + for (int i = 0; i < 5; i++) { + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + if (null == resourceTracker.attributes) { + nullAttributes++; + } else { + Assert.assertTrue("In heartbeat PI attributes should be send", + NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1), + resourceTracker.attributes)); + nonNullAttributes++; + } + resourceTracker.resetNMHeartbeatReceiveFlag(); + Thread.sleep(1000); + } + Assert.assertTrue("More than one heartbeat with empty attributes expected", + nullAttributes > 1); + Assert.assertTrue("More than one heartbeat with attributes expected", + nonNullAttributes > 1); + nm.stop(); + } + + @Test(timeout = 20000) + public void testInvalidNodeAttributesFromProvider() + throws InterruptedException, IOException, TimeoutException { + final ResourceTrackerForAttributes resourceTracker = + new ResourceTrackerForAttributes(); + nm = new NodeManager() { + @Override protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + return dummyAttributesProviderRef; + } + + @Override protected NodeStatusUpdater createNodeStatusUpdater( + Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics) { + @Override protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfigForDistributeNodeAttributes(); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, + "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(), + resourceTracker.attributes)); + + resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute1 + NodeAttribute attribute1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute2 + NodeAttribute attribute2 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", + NodeAttributeType.STRING, "V2"); + dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute2)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute2), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // update attribute2 & attribute2 + dummyAttributesProviderRef + .setDescriptors(ImmutableSet.of(attribute1, attribute2)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue(NodeLabelUtil + .isNodeAttributesEquals(ImmutableSet.of(attribute1, attribute2), + resourceTracker.attributes)); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with invalid attributes + NodeAttribute invalidAttribute = NodeAttribute + .newInstance("_.P", "Attr1", NodeAttributeType.STRING, "V1"); + dummyAttributesProviderRef + .setDescriptors(ImmutableSet.of(invalidAttribute)); + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNull("On Invalid Attributes we need to retain earlier attributes, HB" + + " needs to send null", resourceTracker.attributes); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // on next heartbeat same invalid attributes will be given by the provider, + // but again validation check and reset RM with invalid attributes set + // should not happen + sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertNull("NodeStatusUpdater need not send repeatedly empty attributes on" + + " invalid attributes from provider ", resourceTracker.attributes); + resourceTracker.resetNMHeartbeatReceiveFlag(); + } + + /** + * This is to avoid race condition in the test case. NodeStatusUpdater + * heartbeat thread after sending the heartbeat needs some time to process the + * response and then go wait state. But in the test case once the main test + * thread returns back after resourceTracker.waitTillHeartbeat() we proceed + * with next sendOutofBandHeartBeat before heartbeat thread is blocked on + * wait. + * @throws InterruptedException + * @throws IOException + */ + private void sendOutofBandHeartBeat() + throws InterruptedException, IOException { + int i = 0; + do { + State statusUpdaterThreadState = + ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater()) + .getStatusUpdaterThreadState(); + if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING) + || statusUpdaterThreadState.equals(Thread.State.WAITING)) { + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + break; + } + if (++i <= 10) { + Thread.sleep(50); + } else { + throw new IOException("Waited for 500 ms" + + " but NodeStatusUpdaterThread not in waiting state"); + } + } while (true); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..6f669c83eb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -506,6 +507,22 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId); } + // Update node's attributes to RM's NodeAttributesManager. + if (request.getNodeAttributes() != null) { + try { + // update node attributes if necessary then update heartbeat response + updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); + response.setAreNodeAttributesAcceptedByRM(true); + } catch (IOException ex) { + //ensure the error message is captured and sent across in response + String errorMsg = response.getDiagnosticsMessage() == null ? + ex.getMessage() : + response.getDiagnosticsMessage() + "\n" + ex.getMessage(); + response.setDiagnosticsMessage(errorMsg); + response.setAreNodeAttributesAcceptedByRM(false); + } + } + StringBuilder message = new StringBuilder(); message.append("NodeManager from node ").append(host).append("(cmPort: ") .append(cmPort).append(" httpPort: "); @@ -516,6 +533,10 @@ public class ResourceTrackerService extends AbstractService implements message.append(", node labels { ").append( StringUtils.join(",", nodeLabels) + " } "); } + if (response.getAreNodeAttributesAcceptedByRM()) { + message.append(", node attributes { ") + .append(request.getNodeAttributes() + " } "); + } LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); @@ -673,34 +694,72 @@ public class ResourceTrackerService extends AbstractService implements // 8. Get node's attributes and update node-to-attributes mapping // in RMNodeAttributeManager. - Set nodeAttributes = request.getNodeAttributes(); - if (nodeAttributes != null && !nodeAttributes.isEmpty()) { - nodeAttributes.forEach(nodeAttribute -> - LOG.debug(nodeId.toString() + " ATTRIBUTE : " - + nodeAttribute.toString())); - - // Validate attributes - if (!nodeAttributes.stream().allMatch( - nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED - .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) { - // All attributes must be in same prefix: nm.yarn.io. - // Since we have the checks in NM to make sure attributes reported - // in HB are with correct prefix, so it should not reach here. - LOG.warn("Reject invalid node attributes from host: " - + nodeId.toString() + ", attributes in HB must have prefix " - + NodeAttribute.PREFIX_DISTRIBUTED); - } else { - // Replace all distributed node attributes associated with this host - // with the new reported attributes in node attribute manager. - this.rmContext.getNodeAttributesManager() - .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, - ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + if (request.getNodeAttributes() != null) { + try { + // update node attributes if necessary then update heartbeat response + updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); + nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true); + } catch (IOException ex) { + //ensure the error message is captured and sent across in response + String errorMsg = + nodeHeartBeatResponse.getDiagnosticsMessage() == null ? + ex.getMessage() : + nodeHeartBeatResponse.getDiagnosticsMessage() + "\n" + ex + .getMessage(); + nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg); + nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false); } } return nodeHeartBeatResponse; } + /** + * Update node attributes if necessary. + * @param nodeId - node id + * @param nodeAttributes - node attributes + * @return true if updated + * @throws IOException if prefix type is not distributed + */ + private void updateNodeAttributesIfNecessary(NodeId nodeId, + Set nodeAttributes) throws IOException { + if (LOG.isDebugEnabled()) { + nodeAttributes.forEach(nodeAttribute -> LOG.debug( + nodeId.toString() + " ATTRIBUTE : " + nodeAttribute.toString())); + } + + // Validate attributes + if (!nodeAttributes.stream().allMatch( + nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED + .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) { + // All attributes must be in same prefix: nm.yarn.io. + // Since we have the checks in NM to make sure attributes reported + // in HB are with correct prefix, so it should not reach here. + throw new IOException("Reject invalid node attributes from host: " + + nodeId.toString() + ", attributes in HB must have prefix " + + NodeAttribute.PREFIX_DISTRIBUTED); + } + // Replace all distributed node attributes associated with this host + // with the new reported attributes in node attribute manager. + Set currentNodeAttributes = + this.rmContext.getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).keySet(); + if (!currentNodeAttributes.isEmpty()) { + currentNodeAttributes = NodeLabelUtil + .filterAttributesByPrefix(currentNodeAttributes, + NodeAttribute.PREFIX_DISTRIBUTED); + } + if (!NodeLabelUtil + .isNodeAttributesEquals(nodeAttributes, currentNodeAttributes)) { + this.rmContext.getNodeAttributesManager() + .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Skip updating node attributes since there is no change for " + + nodeId + " : " + nodeAttributes); + } + } + private int getNextResponseId(int responseId) { // Loop between 0 and Integer.MAX_VALUE return (responseId + 1) & Integer.MAX_VALUE; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 83c59832eb9..90cf110d41c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -221,10 +221,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { // Notify RM if (rmContext != null && rmContext.getDispatcher() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updated NodeAttribute event to RM:" - + newNodeToAttributesMap.values()); - } + LOG.info("Updated NodeAttribute event to RM:" + + newNodeToAttributesMap); rmContext.getDispatcher().getEventHandler().handle( new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); } @@ -306,9 +304,11 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) { NodeAttributeKey attributeKey = attribute.getAttributeKey(); String attributeName = attributeKey.getAttributeName().trim(); - NodeLabelUtil.checkAndThrowLabelName(attributeName); + NodeLabelUtil.checkAndThrowAttributeName(attributeName); NodeLabelUtil .checkAndThrowAttributePrefix(attributeKey.getAttributePrefix()); + NodeLabelUtil + .checkAndThrowAttributeValue(attribute.getAttributeValue()); // ensure trimmed values are set back attributeKey.setAttributeName(attributeName); @@ -747,8 +747,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { // Notify RM if (rmContext != null && rmContext.getDispatcher() != null) { - LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap - .values()); + LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap); rmContext.getDispatcher().getEventHandler().handle( new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index b451db1a7d5..d3db0d366d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; +import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -45,6 +49,7 @@ import java.util.Set; import java.util.HashSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; @@ -116,6 +121,9 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -733,6 +741,137 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } } + @Test + public void testNodeRegistrationWithAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + File tempDir = File.createTempFile("nattr", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + NodeAttribute nodeAttribute1 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + NodeAttribute nodeAttribute2 = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", + NodeAttributeType.STRING, "V2"); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2)); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(registerReq); + + Assert.assertEquals("Action should be normal on valid Node Attributes", + NodeAction.NORMAL, response.getNodeAction()); + Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals( + rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).keySet(), + registerReq.getNodeAttributes())); + Assert.assertTrue("Valid Node Attributes were not accepted by RM", + response.getAreNodeAttributesAcceptedByRM()); + + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testNodeRegistrationWithInvalidAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + TEMP_DIR.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + NodeAttribute validNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1", + NodeAttributeType.STRING, "V1"); + NodeAttribute invalidPrefixNodeAttribute = NodeAttribute + .newInstance("_P", "Attr1", + NodeAttributeType.STRING, "V2"); + NodeAttribute invalidNameNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N", + NodeAttributeType.STRING, "V2"); + NodeAttribute invalidValueNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", + NodeAttributeType.STRING, "..."); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + + // check invalid prefix + req.setNodeAttributes( + toSet(validNodeAttribute, invalidPrefixNodeAttribute)); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertRegisterResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .endsWith("attributes in HB must have prefix nm.yarn.io")); + + // check invalid name + req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute)); + response = resourceTrackerService.registerNodeManager(req); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertRegisterResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .startsWith("attribute name should only contains")); + + // check invalid value + req.setNodeAttributes(toSet(validNodeAttribute, invalidValueNodeAttribute)); + response = resourceTrackerService.registerNodeManager(req); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertRegisterResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .startsWith("attribute value should only contains")); + + if (rm != null) { + rm.stop(); + } + } + + private void assertRegisterResponseForInvalidAttributes( + RegisterNodeManagerResponse response) { + Assert.assertEquals( + "On Invalid Node Labels action is expected to be normal", + NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNotNull(response.getDiagnosticsMessage()); + Assert.assertFalse("Node Labels should not accepted by RM If Invalid", + response.getAreNodeLabelsAcceptedByRM()); + } + private NodeStatus getNodeStatusObject(NodeId nodeId) { NodeStatus status = Records.newRecord(NodeStatus.class); status.setNodeId(nodeId); @@ -835,12 +974,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { hostFile.getAbsolutePath()); conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, FileSystemNodeAttributeStore.class, NodeAttributeStore.class); - File tempDir = File.createTempFile("nattr", ".tmp"); - tempDir.delete(); - tempDir.mkdirs(); - tempDir.deleteOnExit(); conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, - tempDir.getAbsolutePath()); + TEMP_DIR.getAbsolutePath()); rm = new MockRM(conf); rm.start(); @@ -908,6 +1043,285 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); } + @Test + public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + TEMP_DIR.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + NodeAttribute validNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host2"); + NodeAttribute invalidPrefixNodeAttribute = NodeAttribute + .newInstance("_P", "Attr1", + NodeAttributeType.STRING, "V2"); + NodeAttribute invalidNameNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N", + NodeAttributeType.STRING, "V2"); + NodeAttribute invalidValueNodeAttribute = NodeAttribute + .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2", + NodeAttributeType.STRING, "..."); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + int responseId = nodeStatusObject.getResponseId(); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(toSet(validNodeAttribute)); + + // Send first HB to RM with invalid prefix node attributes + heartbeatReq.setNodeAttributes( + toSet(validNodeAttribute, invalidPrefixNodeAttribute)); + NodeHeartbeatResponse response = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertNodeHeartbeatResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .endsWith("attributes in HB must have prefix nm.yarn.io")); + + // Send another HB to RM with invalid name node attributes + nodeStatusObject.setResponseId(++responseId); + heartbeatReq + .setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute)); + response = resourceTrackerService.nodeHeartbeat(heartbeatReq); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertNodeHeartbeatResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .startsWith("attribute name should only contains")); + + // Send another HB to RM with invalid value node attributes + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeAttributes( + toSet(validNodeAttribute, invalidValueNodeAttribute)); + response = resourceTrackerService.nodeHeartbeat(heartbeatReq); + Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).size()); + assertNodeHeartbeatResponseForInvalidAttributes(response); + Assert.assertTrue(response.getDiagnosticsMessage() + .startsWith("attribute value should only contains")); + + // Send another HB to RM with updated node attribute + NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host3"); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute)); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM gets the updated attribute + NodeAttributesManager attributeManager = + rm.getRMContext().getNodeAttributesManager(); + Map attrs = + attributeManager.getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(1, attrs.size()); + NodeAttribute na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); + Assert.assertEquals("host3", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + } + + private void assertNodeHeartbeatResponseForInvalidAttributes( + NodeHeartbeatResponse response) { + Assert.assertEquals( + "On Invalid Node Labels action is expected to be normal", + NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNotNull(response.getDiagnosticsMessage()); + Assert.assertFalse("Node Labels should not accepted by RM If Invalid", + response.getAreNodeLabelsAcceptedByRM()); + } + + @Test + public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded() + throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + NullNodeAttributeStore.class, NodeAttributeStore.class); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + TEMP_DIR.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // spy node attributes manager + NodeAttributesManager tmpAttributeManager = + rm.getRMContext().getNodeAttributesManager(); + NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager); + rm.getRMContext().setNodeAttributesManager(spyAttributeManager); + AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + count.incrementAndGet(); + tmpAttributeManager + .replaceNodeAttributes((String) invocation.getArguments()[0], + (Map>) invocation.getArguments()[1]); + return null; + } + }).when(spyAttributeManager) + .replaceNodeAttributes(Mockito.any(String.class), + Mockito.any(Map.class)); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + Set nodeAttributes = new HashSet<>(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host2")); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + int responseId = nodeStatusObject.getResponseId(); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Ensure RM gets correct node attributes update. + Map attrs = spyAttributeManager + .getAttributesForNode(nodeId.getHost()); + spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost())); + Assert.assertEquals(1, attrs.size()); + NodeAttribute na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); + Assert.assertEquals("host2", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + Assert.assertEquals(1, count.get()); + + // Send HBs to RM with the same node attributes + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM updated node attributes once + Assert.assertEquals(1, count.get()); + + // Send another HB to RM with updated node attributes + nodeAttributes.clear(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host3")); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM gets the updated attribute + attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(1, attrs.size()); + na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); + Assert.assertEquals("host3", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + + // Make sure RM updated node attributes twice + Assert.assertEquals(2, count.get()); + + // Add centralized attributes + Map> nodeAttributeMapping = ImmutableMap + .of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance( + NodeAttribute.PREFIX_CENTRALIZED, "centAttr", + NodeAttributeType.STRING, "x"))); + spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED, + nodeAttributeMapping); + + // Make sure RM updated node attributes three times + Assert.assertEquals(3, count.get()); + + // Send another HB to RM with non-updated node attributes + nodeAttributes.clear(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host3")); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM still updated node attributes three times + Assert.assertEquals(3, count.get()); + + // Send another HB to RM with updated node attributes + nodeAttributes.clear(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host4")); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM gets the updated attribute + attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(2, attrs.size()); + attrs.keySet().stream().forEach(e -> { + Assert.assertEquals(NodeAttributeType.STRING, e.getAttributeType()); + if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_DISTRIBUTED) { + Assert.assertEquals("host", e.getAttributeKey().getAttributeName()); + Assert.assertEquals("host4", e.getAttributeValue()); + } else if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_CENTRALIZED) { + Assert.assertEquals("centAttr", e.getAttributeKey().getAttributeName()); + Assert.assertEquals("x", e.getAttributeValue()); + } + }); + + // Make sure RM updated node attributes four times + Assert.assertEquals(4, count.get()); + + if (rm != null) { + rm.stop(); + } + } + @Test public void testNodeHeartBeatWithInvalidLabels() throws Exception { writeToHostsFile("host2"); @@ -2447,4 +2861,34 @@ public class TestResourceTrackerService extends NodeLabelTestBase { response.getNodeAction()); mockRM.stop(); } + + /** + * A no-op implementation of NodeAttributeStore for testing + */ + public static class NullNodeAttributeStore implements NodeAttributeStore { + + @Override + public void replaceNodeAttributes(List nodeToAttribute) { + } + + @Override + public void addNodeAttributes(List nodeToAttribute) { + } + + @Override + public void removeNodeAttributes(List nodeToAttribute) { + } + + @Override + public void init(Configuration configuration, NodeAttributesManager mgr) { + } + + @Override + public void recover() { + } + + @Override + public void close() { + } + } }