YARN-8925. Updating distributed node attributes only when necessary. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2018-12-21 10:56:42 +08:00
parent a80d321074
commit f659485ee8
16 changed files with 1477 additions and 112 deletions

View File

@ -3669,6 +3669,12 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
2 * 60 * 1000;
public static final String NM_NODE_ATTRIBUTES_RESYNC_INTERVAL =
NM_NODE_ATTRIBUTES_PREFIX + "resync-interval-ms";
public static final long DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL =
2 * 60 * 1000;
// If -1 is configured then no timer task should be created
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -40,6 +41,8 @@ public final class NodeLabelUtil {
Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_\\.]*");
private static final Pattern ATTRIBUTE_VALUE_PATTERN =
Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_.]*");
private static final Pattern ATTRIBUTE_NAME_PATTERN =
Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
public static void checkAndThrowLabelName(String label) throws IOException {
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
@ -57,6 +60,25 @@ public final class NodeLabelUtil {
}
}
public static void checkAndThrowAttributeName(String attributeName)
throws IOException {
if (attributeName == null || attributeName.isEmpty()
|| attributeName.length() > MAX_LABEL_LENGTH) {
throw new IOException(
"attribute name added is empty or exceeds " + MAX_LABEL_LENGTH
+ " character(s)");
}
attributeName = attributeName.trim();
boolean match = ATTRIBUTE_NAME_PATTERN.matcher(attributeName).matches();
if (!match) {
throw new IOException("attribute name should only contains "
+ "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+ ", now it is= " + attributeName);
}
}
public static void checkAndThrowAttributeValue(String value)
throws IOException {
if (value == null) {
@ -129,7 +151,9 @@ public final class NodeLabelUtil {
// Verify attribute prefix format.
checkAndThrowAttributePrefix(prefix);
// Verify attribute name format.
checkAndThrowLabelName(attributeKey.getAttributeName());
checkAndThrowAttributeName(attributeKey.getAttributeName());
// Verify attribute value format.
checkAndThrowAttributeValue(nodeAttribute.getAttributeValue());
}
}
}
@ -152,4 +176,29 @@ public final class NodeLabelUtil {
.equals(nodeAttribute.getAttributeKey().getAttributePrefix()))
.collect(Collectors.toSet());
}
/**
* Are these two input node attributes the same.
* @return true if they are the same
*/
public static boolean isNodeAttributesEquals(
Set<NodeAttribute> leftNodeAttributes,
Set<NodeAttribute> rightNodeAttributes) {
if (leftNodeAttributes == null && rightNodeAttributes == null) {
return true;
} else if (leftNodeAttributes == null || rightNodeAttributes == null
|| leftNodeAttributes.size() != rightNodeAttributes.size()) {
return false;
}
return leftNodeAttributes.stream()
.allMatch(e -> isNodeAttributeIncludes(rightNodeAttributes, e));
}
private static boolean isNodeAttributeIncludes(
Set<NodeAttribute> nodeAttributes, NodeAttribute checkNodeAttribute) {
return nodeAttributes.stream().anyMatch(
e -> e.equals(checkNodeAttribute) && Objects
.equals(e.getAttributeValue(),
checkNodeAttribute.getAttributeValue()));
}
}

View File

@ -3005,6 +3005,15 @@
<value></value>
</property>
<property>
<description>
Interval at which NM syncs its node attributes with RM. NM will send its loaded
attributes every x intervals configured, along with heartbeat to RM.
</description>
<name>yarn.nodemanager.node-attributes.resync-interval-ms</name>
<value>120000</value>
</property>
<property>
<description>
Timeout in seconds for YARN node graceful decommission.

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.nodelabels;
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.junit.Assert;
import org.junit.Test;
/**
@ -48,4 +53,70 @@ public class TestNodeLabelUtil {
}
}
}
@Test
public void testIsNodeAttributesEquals() {
NodeAttribute nodeAttributeCK1V1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttributeCK1V1Copy = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttributeDK1V1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttributeDK1V1Copy = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttributeDK2V1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttributeDK2V2 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2",
NodeAttributeType.STRING, "V2");
/*
* equals if set size equals and items are all the same
*/
Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(null, null));
Assert.assertTrue(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(), ImmutableSet.of()));
Assert.assertTrue(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
ImmutableSet.of(nodeAttributeCK1V1Copy)));
Assert.assertTrue(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1),
ImmutableSet.of(nodeAttributeDK1V1Copy)));
Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(
ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1),
ImmutableSet.of(nodeAttributeCK1V1Copy, nodeAttributeDK1V1Copy)));
/*
* not equals if set size not equals or items are different
*/
Assert.assertFalse(
NodeLabelUtil.isNodeAttributesEquals(null, ImmutableSet.of()));
Assert.assertFalse(
NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(), null));
// different attribute prefix
Assert.assertFalse(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
ImmutableSet.of(nodeAttributeDK1V1)));
// different attribute name
Assert.assertFalse(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1),
ImmutableSet.of(nodeAttributeDK2V1)));
// different attribute value
Assert.assertFalse(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK2V1),
ImmutableSet.of(nodeAttributeDK2V2)));
// different set
Assert.assertFalse(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
ImmutableSet.of()));
Assert.assertFalse(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1)));
Assert.assertFalse(NodeLabelUtil.isNodeAttributesEquals(
ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1),
ImmutableSet.of(nodeAttributeDK1V1)));
}
}

View File

@ -118,4 +118,9 @@ public abstract class NodeHeartbeatResponse {
public abstract void addAllContainersToDecrease(
Collection<Container> containersToDecrease);
public abstract boolean getAreNodeAttributesAcceptedByRM();
public abstract void setAreNodeAttributesAcceptedByRM(
boolean areNodeAttributesAcceptedByRM);
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
@ -50,6 +51,16 @@ public abstract class RegisterNodeManagerRequest {
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, physicalResource,
null);
}
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource, Set<NodeAttribute> nodeAttributes) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@ -60,6 +71,7 @@ public abstract class RegisterNodeManagerRequest {
request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
request.setNodeAttributes(nodeAttributes);
return request;
}
@ -117,4 +129,8 @@ public abstract class RegisterNodeManagerRequest {
public abstract void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps);
public abstract Set<NodeAttribute> getNodeAttributes();
public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
}

View File

@ -58,4 +58,9 @@ public abstract class RegisterNodeManagerResponse {
public abstract void setAreNodeLabelsAcceptedByRM(
boolean areNodeLabelsAcceptedByRM);
public abstract boolean getAreNodeAttributesAcceptedByRM();
public abstract void setAreNodeAttributesAcceptedByRM(
boolean areNodeAttributesAcceptedByRM);
}

View File

@ -787,6 +787,21 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
@Override
public boolean getAreNodeAttributesAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeAttributesAcceptedByRM();
}
@Override
public void setAreNodeAttributesAcceptedByRM(
boolean areNodeAttributesAcceptedByRM) {
maybeInitBuilder();
this.builder
.setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM);
}
@Override
public List<SignalContainerRequest> getContainersToSignalList() {
initContainersToSignal();

View File

@ -26,22 +26,26 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@ -58,6 +62,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
private Set<NodeLabel> labels = null;
private Set<NodeAttribute> attributes = null;
private List<LogAggregationReport> logAggregationReportsForApps = null;
@ -101,6 +106,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
}
builder.setNodeLabels(newBuilder.build());
}
if (this.attributes != null) {
builder.clearNodeAttributes();
NodeAttributesProto.Builder attributesBuilder =
NodeAttributesProto.newBuilder();
for (NodeAttribute attribute : attributes) {
attributesBuilder.addNodeAttributes(convertToProtoFormat(attribute));
}
builder.setNodeAttributes(attributesBuilder.build());
}
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
@ -404,6 +418,36 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
}
}
@Override
public synchronized Set<NodeAttribute> getNodeAttributes() {
initNodeAttributes();
return this.attributes;
}
@Override
public synchronized void setNodeAttributes(
Set<NodeAttribute> nodeAttributes) {
maybeInitBuilder();
builder.clearNodeAttributes();
this.attributes = nodeAttributes;
}
private synchronized void initNodeAttributes() {
if (this.attributes != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeAttributes()) {
attributes=null;
return;
}
NodeAttributesProto nodeAttributes = p.getNodeAttributes();
attributes = new HashSet<>();
for(NodeAttributeProto nap : nodeAttributes.getNodeAttributesList()) {
attributes.add(convertFromProtoFormat(nap));
}
}
private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) {
return new NodeLabelPBImpl(p);
}
@ -412,6 +456,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
return ((NodeLabelPBImpl)t).getProto();
}
private static NodeAttributePBImpl convertFromProtoFormat(
NodeAttributeProto p) {
return new NodeAttributePBImpl(p);
}
private static NodeAttributeProto convertToProtoFormat(NodeAttribute t) {
return ((NodeAttributePBImpl)t).getProto();
}
private static ApplicationIdPBImpl convertFromProtoFormat(
ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);

View File

@ -269,4 +269,19 @@ public class RegisterNodeManagerResponsePBImpl
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
@Override
public boolean getAreNodeAttributesAcceptedByRM() {
RegisterNodeManagerResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeAttributesAcceptedByRM();
}
@Override
public void setAreNodeAttributesAcceptedByRM(
boolean areNodeAttributesAcceptedByRM) {
maybeInitBuilder();
this.builder
.setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM);
}
}

View File

@ -72,6 +72,7 @@ message RegisterNodeManagerRequestProto {
optional NodeLabelsProto nodeLabels = 8;
optional ResourceProto physicalResource = 9;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
optional NodeAttributesProto nodeAttributes = 11;
}
message RegisterNodeManagerResponseProto {
@ -83,6 +84,7 @@ message RegisterNodeManagerResponseProto {
optional string rm_version = 6;
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
optional ResourceProto resource = 8;
optional bool areNodeAttributesAcceptedByRM = 9 [default = false];
}
message UnRegisterNodeManagerRequestProto {
@ -128,6 +130,7 @@ message NodeHeartbeatResponseProto {
repeated AppCollectorDataProto app_collectors = 16;
// to be used in place of containers_to_decrease
repeated ContainerProto containers_to_update = 17;
optional bool areNodeAttributesAcceptedByRM = 18 [default = false];
}
message ContainerQueuingLimitProto {

View File

@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -377,6 +378,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
throws YarnException, IOException {
RegisterNodeManagerResponse regNMResponse;
Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
Set<NodeAttribute> nodeAttributes =
nodeAttributesHandler.getNodeAttributesForRegistration();
// Synchronize NM-RM registration with
// ContainerManagerImpl#increaseContainersResource and
@ -387,7 +390,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels, physicalResource);
nodeLabels, physicalResource, nodeAttributes);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
@ -473,6 +476,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
successfullRegistrationMsg.append(nodeLabelsHandler
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
successfullRegistrationMsg.append(nodeAttributesHandler
.verifyRMRegistrationResponseForNodeAttributes(regNMResponse));
LOG.info(successfullRegistrationMsg.toString());
}
@ -875,34 +880,254 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
*/
private NMNodeAttributesHandler createNMNodeAttributesHandler(
NodeAttributesProvider provider) {
return provider == null ? null :
new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
if (provider == null) {
return new NMCentralizedNodeAttributesHandler();
} else {
return new NMDistributedNodeAttributesHandler(provider, this.getConfig());
}
}
private static abstract class CachedNodeDescriptorHandler<T> {
private final long resyncInterval;
private final T defaultValue;
private T previousValue;
private long lastSendMills = 0L;
private boolean isValueSented;
CachedNodeDescriptorHandler(T defaultValue,
long resyncInterval) {
this.defaultValue = defaultValue;
this.resyncInterval = resyncInterval;
}
public abstract T getValueFromProvider();
public T getValueForRegistration() {
T value = getValueFromProvider();
if (defaultValue != null) {
value = (null == value) ? defaultValue : value;
}
previousValue = value;
try {
validate(value);
} catch (IOException e) {
value = null;
}
return value;
}
public T getValueForHeartbeat() {
T value = getValueFromProvider();
// if the provider returns null then consider default value are set
if (defaultValue != null) {
value = (null == value) ? defaultValue : value;
}
// take some action only on modification of value
boolean isValueUpdated = isValueUpdated(value);
isValueSented = false;
// When value updated or resync time is elapsed will send again in
// heartbeat.
if (isValueUpdated || isResyncIntervalElapsed()) {
previousValue = value;
try {
validate(value);
isValueSented = true;
} catch (IOException e) {
// take previous value to replace invalid value, so that invalid
// value are not verified for every HB, and send empty set
// to RM to have same value which was earlier set.
value = null;
} finally {
// Set last send time in heartbeat
lastSendMills = System.currentTimeMillis();
}
} else {
// if value have not changed then no need to send
value = null;
}
return value;
}
/**
* This method checks resync interval is elapsed or not.
*/
public boolean isResyncIntervalElapsed() {
long elapsedTimeSinceLastSync =
System.currentTimeMillis() - lastSendMills;
if (elapsedTimeSinceLastSync > resyncInterval) {
return true;
}
return false;
}
protected abstract void validate(T value) throws IOException;
protected abstract boolean isValueUpdated(T value);
public long getResyncInterval() {
return resyncInterval;
}
public T getDefaultValue() {
return defaultValue;
}
public T getPreviousValue() {
return previousValue;
}
public long getLastSendMills() {
return lastSendMills;
}
public boolean isValueSented() {
return isValueSented;
}
}
private interface NMNodeAttributesHandler {
/**
* validates nodeAttributes From Provider and returns it to the caller. Also
* ensures that if provider returns null then empty set is considered
*/
Set<NodeAttribute> getNodeAttributesForRegistration();
/**
* @return the node attributes of this node manager.
*/
Set<NodeAttribute> getNodeAttributesForHeartbeat();
/**
* @return RMRegistration Success message and on failure will log
* independently and returns empty string
*/
String verifyRMRegistrationResponseForNodeAttributes(
RegisterNodeManagerResponse regNMResponse);
/**
* check whether if updated attributes sent to RM was accepted or not.
* @param response
*/
void verifyRMHeartbeatResponseForNodeAttributes(
NodeHeartbeatResponse response);
}
/**
* In centralized configuration, NM need not send Node attributes or process
* the response.
*/
private static class NMCentralizedNodeAttributesHandler
implements NMNodeAttributesHandler {
@Override
public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
return null;
}
@Override
public Set<NodeAttribute> getNodeAttributesForRegistration() {
return null;
}
@Override
public void verifyRMHeartbeatResponseForNodeAttributes(
NodeHeartbeatResponse response) {
}
@Override
public String verifyRMRegistrationResponseForNodeAttributes(
RegisterNodeManagerResponse regNMResponse) {
return "";
}
}
private static class NMDistributedNodeAttributesHandler
extends CachedNodeDescriptorHandler<Set<NodeAttribute>>
implements NMNodeAttributesHandler {
private final NodeAttributesProvider attributesProvider;
protected NMDistributedNodeAttributesHandler(
NodeAttributesProvider provider) {
NodeAttributesProvider provider, Configuration conf) {
super(Collections.unmodifiableSet(new HashSet<>(0)),
conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL,
YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL));
this.attributesProvider = provider;
}
@Override
public Set<NodeAttribute> getNodeAttributesForRegistration() {
return getValueForRegistration();
}
@Override
public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
return getValueForHeartbeat();
}
@Override
public Set<NodeAttribute> getValueFromProvider() {
return attributesProvider.getDescriptors();
}
}
@Override
protected void validate(Set<NodeAttribute> nodeAttributes)
throws IOException {
try {
NodeLabelUtil.validateNodeAttributes(nodeAttributes);
} catch (IOException e) {
LOG.error(
"Invalid node attribute(s) from Provider : " + e.getMessage());
throw e;
}
}
@Override
protected boolean isValueUpdated(Set<NodeAttribute> value) {
return !NodeLabelUtil.isNodeAttributesEquals(getPreviousValue(), value);
}
@Override
public String verifyRMRegistrationResponseForNodeAttributes(
RegisterNodeManagerResponse regNMResponse) {
StringBuilder successfulNodeAttributesRegistrationMsg =
new StringBuilder();
if (regNMResponse.getAreNodeAttributesAcceptedByRM()) {
successfulNodeAttributesRegistrationMsg
.append(" and with following Node attribute(s) : {")
.append(getPreviousValue()).append("}");
} else {
// case where provider is set but RM did not accept the node attributes
String errorMsgFromRM = regNMResponse.getDiagnosticsMessage();
LOG.error("Node attributes sent from NM while registration were"
+ " rejected by RM. " + ((errorMsgFromRM == null) ?
"Seems like RM is configured with Centralized Attributes." :
"And with message " + regNMResponse.getDiagnosticsMessage()));
}
return successfulNodeAttributesRegistrationMsg.toString();
}
@Override
public void verifyRMHeartbeatResponseForNodeAttributes(
NodeHeartbeatResponse response) {
if (isValueSented()) {
if (response.getAreNodeAttributesAcceptedByRM()) {
if(LOG.isDebugEnabled()){
LOG.debug("Node attributes {" + getPreviousValue()
+ "} were Accepted by RM ");
}
} else {
// case where updated node attributes from NodeAttributesProvider
// is sent to RM and RM rejected the attributes
LOG.error("NM node attributes {" + getPreviousValue()
+ "} were not accepted by RM and message from RM : " + response
.getDiagnosticsMessage());
}
}
}
}
private static interface NMNodeLabelsHandler {
/**
@ -963,33 +1188,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
private static class NMDistributedNodeLabelsHandler
extends CachedNodeDescriptorHandler<Set<NodeLabel>>
implements NMNodeLabelsHandler {
private NMDistributedNodeLabelsHandler(
NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
this.nodeLabelsProvider = nodeLabelsProvider;
this.resyncInterval =
super(CommonNodeLabelsManager.EMPTY_NODELABEL_SET,
conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL));
this.nodeLabelsProvider = nodeLabelsProvider;
}
private final NodeLabelsProvider nodeLabelsProvider;
private Set<NodeLabel> previousNodeLabels;
private boolean areLabelsSentToRM;
private long lastNodeLabelSendMills = 0L;
private final long resyncInterval;
@Override
public Set<NodeLabel> getNodeLabelsForRegistration() {
Set<NodeLabel> nodeLabels = nodeLabelsProvider.getDescriptors();
nodeLabels = (null == nodeLabels)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
previousNodeLabels = nodeLabels;
try {
validateNodeLabels(nodeLabels);
} catch (IOException e) {
nodeLabels = null;
}
return nodeLabels;
return getValueForRegistration();
}
@Override
@ -999,7 +1213,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
successfulNodeLabelsRegistrationMsg
.append(" and with following Node label(s) : {")
.append(StringUtils.join(",", previousNodeLabels)).append("}");
.append(StringUtils.join(",", getPreviousValue())).append("}");
} else {
// case where provider is set but RM did not accept the Node Labels
String errorMsgFromRM = regNMResponse.getDiagnosticsMessage();
@ -1014,50 +1228,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsProvider.getDescriptors();
// if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabelsForHeartbeat;
// take some action only on modification of labels
boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
areLabelsSentToRM = false;
// When nodelabels elapsed or resync time is elapsed will send again in
// heartbeat.
if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
previousNodeLabels = nodeLabelsForHeartbeat;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Labels from provider: "
+ StringUtils.join(",", previousNodeLabels));
}
validateNodeLabels(nodeLabelsForHeartbeat);
areLabelsSentToRM = true;
} catch (IOException e) {
// set previous node labels to invalid set, so that invalid
// labels are not verified for every HB, and send empty set
// to RM to have same nodeLabels which was earlier set.
nodeLabelsForHeartbeat = null;
} finally {
// Set last send time in heartbeat
lastNodeLabelSendMills = System.currentTimeMillis();
}
} else {
// if nodelabels have not changed then no need to send
nodeLabelsForHeartbeat = null;
}
return nodeLabelsForHeartbeat;
return getValueForHeartbeat();
}
private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
protected void validate(Set<NodeLabel> nodeLabels)
throws IOException {
Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator();
Iterator<NodeLabel> iterator = nodeLabels.iterator();
boolean hasInvalidLabel = false;
StringBuilder errorMsg = new StringBuilder("");
StringBuilder errorMsg = new StringBuilder();
while (iterator.hasNext()) {
try {
NodeLabelUtil
@ -1074,33 +1252,31 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
/*
* This method checks resync interval is elapsed or not.
*/
public boolean isResyncIntervalElapsed() {
long elapsedTimeSinceLastSync =
System.currentTimeMillis() - lastNodeLabelSendMills;
if (elapsedTimeSinceLastSync > resyncInterval) {
return true;
}
return false;
@Override
public Set<NodeLabel> getValueFromProvider() {
return this.nodeLabelsProvider.getDescriptors();
}
@Override
protected boolean isValueUpdated(Set<NodeLabel> value) {
return !Objects.equals(value, getPreviousValue());
}
@Override
public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
if (areLabelsSentToRM) {
if (isValueSented()) {
if (response.getAreNodeLabelsAcceptedByRM()) {
if(LOG.isDebugEnabled()){
LOG.debug(
"Node Labels {" + StringUtils.join(",", previousNodeLabels)
"Node Labels {" + StringUtils.join(",", getPreviousValue())
+ "} were Accepted by RM ");
}
} else {
// case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels
LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
"NM node labels {" + StringUtils.join(",", getPreviousValue())
+ "} were not accepted by RM and message from RM : "
+ response.getDiagnosticsMessage());
}
@ -1120,7 +1296,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
Set<NodeAttribute> nodeAttributesForHeartbeat =
nodeAttributesHandler == null ? null :
nodeAttributesHandler.getNodeAttributesForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
@ -1153,6 +1328,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (!handleShutdownOrResyncCommand(response)) {
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
response);
nodeAttributesHandler
.verifyRMHeartbeatResponseForNodeAttributes(response);
// Explicitly put this method after checking the resync
// response. We

View File

@ -0,0 +1,439 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test NodeStatusUpdater for node attributes.
*/
public class TestNodeStatusUpdaterForAttributes extends NodeLabelTestBase {
private static final RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
private NodeManager nm;
private DummyNodeAttributesProvider dummyAttributesProviderRef;
@Before
public void setup() {
dummyAttributesProviderRef = new DummyNodeAttributesProvider();
}
@After
public void tearDown() {
if (null != nm) {
ServiceOperations.stop(nm);
}
}
private class ResourceTrackerForAttributes implements ResourceTracker {
private int heartbeatID = 0;
private Set<NodeAttribute> attributes;
private boolean receivedNMHeartbeat = false;
private boolean receivedNMRegister = false;
private MasterKey createMasterKey() {
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(
ByteBuffer.wrap(new byte[] {new Integer(123).byteValue() }));
return masterKey;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, IOException {
attributes = request.getNodeAttributes();
RegisterNodeManagerResponse response =
RECORD_FACTORY.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(NodeAction.NORMAL);
response.setContainerTokenMasterKey(createMasterKey());
response.setNMTokenMasterKey(createMasterKey());
response.setAreNodeAttributesAcceptedByRM(attributes != null);
synchronized (ResourceTrackerForAttributes.class) {
receivedNMRegister = true;
ResourceTrackerForAttributes.class.notifyAll();
}
return response;
}
public void waitTillHeartbeat()
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> receivedNMHeartbeat, 100, 30000);
if (!receivedNMHeartbeat) {
Assert.fail("Heartbeat is not received even after waiting");
}
}
public void waitTillRegister()
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> receivedNMRegister, 100, 30000);
if (!receivedNMRegister) {
Assert.fail("Registration is not received even after waiting");
}
}
/**
* Flag to indicate received any.
*/
public void resetNMHeartbeatReceiveFlag() {
synchronized (ResourceTrackerForAttributes.class) {
receivedNMHeartbeat = false;
}
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) {
attributes = request.getNodeAttributes();
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartbeatID++);
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(heartbeatID, NodeAction.NORMAL, null, null,
null, null, 1000L);
// to ensure that heartbeats are sent only when required.
nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
nhResponse.setAreNodeAttributesAcceptedByRM(attributes != null);
synchronized (ResourceTrackerForAttributes.class) {
receivedNMHeartbeat = true;
ResourceTrackerForAttributes.class.notifyAll();
}
return nhResponse;
}
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) {
return null;
}
}
/**
* A dummy NodeAttributesProvider class for tests.
*/
public static class DummyNodeAttributesProvider
extends NodeAttributesProvider {
public DummyNodeAttributesProvider() {
super("DummyNodeAttributesProvider");
// disable the fetch timer.
setIntervalTime(-1);
}
@Override
protected void cleanUp() throws Exception {
// fake implementation, nothing to cleanup
}
@Override
public TimerTask createTimerTask() {
return new TimerTask() {
@Override
public void run() {
setDescriptors(Collections.unmodifiableSet(new HashSet<>(0)));
}
};
}
}
private YarnConfiguration createNMConfigForDistributeNodeAttributes() {
YarnConfiguration conf = new YarnConfiguration();
return conf;
}
@Test(timeout = 20000)
public void testNodeStatusUpdaterForNodeAttributes()
throws InterruptedException, IOException, TimeoutException {
final ResourceTrackerForAttributes resourceTracker =
new ResourceTrackerForAttributes();
nm = new NodeManager() {
@Override
protected NodeAttributesProvider createNodeAttributesProvider(
Configuration conf) throws IOException {
return dummyAttributesProviderRef;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(
Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
};
}
};
YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
conf.setLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, 2000);
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
"0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertTrue(NodeLabelUtil
.isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
resourceTracker.attributes));
resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with updated attributes
NodeAttribute attribute1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
NodeAttributeType.STRING, "V1");
dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertTrue(NodeLabelUtil
.isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
resourceTracker.attributes));
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat without updating attributes
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
resourceTracker.resetNMHeartbeatReceiveFlag();
assertNull("If no change in attributes"
+ " then null should be sent as part of request",
resourceTracker.attributes);
// provider return with null attributes
dummyAttributesProviderRef.setDescriptors(null);
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNotNull("If provider sends null"
+ " then empty label set should be sent and not null",
resourceTracker.attributes);
assertTrue("If provider sends null then empty attributes should be sent",
resourceTracker.attributes.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag();
// Since the resync interval is set to 2 sec in every alternate heartbeat
// the attributes will be send along with heartbeat.
// In loop we sleep for 1 sec
// so that every sec 1 heartbeat is send.
int nullAttributes = 0;
int nonNullAttributes = 0;
dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
for (int i = 0; i < 5; i++) {
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
if (null == resourceTracker.attributes) {
nullAttributes++;
} else {
Assert.assertTrue("In heartbeat PI attributes should be send",
NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
resourceTracker.attributes));
nonNullAttributes++;
}
resourceTracker.resetNMHeartbeatReceiveFlag();
Thread.sleep(1000);
}
Assert.assertTrue("More than one heartbeat with empty attributes expected",
nullAttributes > 1);
Assert.assertTrue("More than one heartbeat with attributes expected",
nonNullAttributes > 1);
nm.stop();
}
@Test(timeout = 20000)
public void testInvalidNodeAttributesFromProvider()
throws InterruptedException, IOException, TimeoutException {
final ResourceTrackerForAttributes resourceTracker =
new ResourceTrackerForAttributes();
nm = new NodeManager() {
@Override protected NodeAttributesProvider createNodeAttributesProvider(
Configuration conf) throws IOException {
return dummyAttributesProviderRef;
}
@Override protected NodeStatusUpdater createNodeStatusUpdater(
Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics) {
@Override protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override protected void stopRMProxy() {
return;
}
};
}
};
YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
"0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertTrue(NodeLabelUtil
.isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
resourceTracker.attributes));
resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// update attribute1
NodeAttribute attribute1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
NodeAttributeType.STRING, "V1");
dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
resourceTracker.attributes));
resourceTracker.resetNMHeartbeatReceiveFlag();
// update attribute2
NodeAttribute attribute2 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
NodeAttributeType.STRING, "V2");
dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute2));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute2),
resourceTracker.attributes));
resourceTracker.resetNMHeartbeatReceiveFlag();
// update attribute2 & attribute2
dummyAttributesProviderRef
.setDescriptors(ImmutableSet.of(attribute1, attribute2));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertTrue(NodeLabelUtil
.isNodeAttributesEquals(ImmutableSet.of(attribute1, attribute2),
resourceTracker.attributes));
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with invalid attributes
NodeAttribute invalidAttribute = NodeAttribute
.newInstance("_.P", "Attr1", NodeAttributeType.STRING, "V1");
dummyAttributesProviderRef
.setDescriptors(ImmutableSet.of(invalidAttribute));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNull("On Invalid Attributes we need to retain earlier attributes, HB"
+ " needs to send null", resourceTracker.attributes);
resourceTracker.resetNMHeartbeatReceiveFlag();
// on next heartbeat same invalid attributes will be given by the provider,
// but again validation check and reset RM with invalid attributes set
// should not happen
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNull("NodeStatusUpdater need not send repeatedly empty attributes on"
+ " invalid attributes from provider ", resourceTracker.attributes);
resourceTracker.resetNMHeartbeatReceiveFlag();
}
/**
* This is to avoid race condition in the test case. NodeStatusUpdater
* heartbeat thread after sending the heartbeat needs some time to process the
* response and then go wait state. But in the test case once the main test
* thread returns back after resourceTracker.waitTillHeartbeat() we proceed
* with next sendOutofBandHeartBeat before heartbeat thread is blocked on
* wait.
* @throws InterruptedException
* @throws IOException
*/
private void sendOutofBandHeartBeat()
throws InterruptedException, IOException {
int i = 0;
do {
State statusUpdaterThreadState =
((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater())
.getStatusUpdaterThreadState();
if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING)
|| statusUpdaterThreadState.equals(Thread.State.WAITING)) {
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
break;
}
if (++i <= 10) {
Thread.sleep(50);
} else {
throw new IOException("Waited for 500 ms"
+ " but NodeStatusUpdaterThread not in waiting state");
}
} while (true);
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -506,6 +507,22 @@ public class ResourceTrackerService extends AbstractService implements
this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
}
// Update node's attributes to RM's NodeAttributesManager.
if (request.getNodeAttributes() != null) {
try {
// update node attributes if necessary then update heartbeat response
updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
response.setAreNodeAttributesAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response
String errorMsg = response.getDiagnosticsMessage() == null ?
ex.getMessage() :
response.getDiagnosticsMessage() + "\n" + ex.getMessage();
response.setDiagnosticsMessage(errorMsg);
response.setAreNodeAttributesAcceptedByRM(false);
}
}
StringBuilder message = new StringBuilder();
message.append("NodeManager from node ").append(host).append("(cmPort: ")
.append(cmPort).append(" httpPort: ");
@ -516,6 +533,10 @@ public class ResourceTrackerService extends AbstractService implements
message.append(", node labels { ").append(
StringUtils.join(",", nodeLabels) + " } ");
}
if (response.getAreNodeAttributesAcceptedByRM()) {
message.append(", node attributes { ")
.append(request.getNodeAttributes() + " } ");
}
LOG.info(message.toString());
response.setNodeAction(NodeAction.NORMAL);
@ -673,34 +694,72 @@ public class ResourceTrackerService extends AbstractService implements
// 8. Get node's attributes and update node-to-attributes mapping
// in RMNodeAttributeManager.
Set<NodeAttribute> nodeAttributes = request.getNodeAttributes();
if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
nodeAttributes.forEach(nodeAttribute ->
LOG.debug(nodeId.toString() + " ATTRIBUTE : "
+ nodeAttribute.toString()));
// Validate attributes
if (!nodeAttributes.stream().allMatch(
nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
.equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) {
// All attributes must be in same prefix: nm.yarn.io.
// Since we have the checks in NM to make sure attributes reported
// in HB are with correct prefix, so it should not reach here.
LOG.warn("Reject invalid node attributes from host: "
+ nodeId.toString() + ", attributes in HB must have prefix "
+ NodeAttribute.PREFIX_DISTRIBUTED);
} else {
// Replace all distributed node attributes associated with this host
// with the new reported attributes in node attribute manager.
this.rmContext.getNodeAttributesManager()
.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(nodeId.getHost(), nodeAttributes));
if (request.getNodeAttributes() != null) {
try {
// update node attributes if necessary then update heartbeat response
updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response
String errorMsg =
nodeHeartBeatResponse.getDiagnosticsMessage() == null ?
ex.getMessage() :
nodeHeartBeatResponse.getDiagnosticsMessage() + "\n" + ex
.getMessage();
nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg);
nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false);
}
}
return nodeHeartBeatResponse;
}
/**
* Update node attributes if necessary.
* @param nodeId - node id
* @param nodeAttributes - node attributes
* @return true if updated
* @throws IOException if prefix type is not distributed
*/
private void updateNodeAttributesIfNecessary(NodeId nodeId,
Set<NodeAttribute> nodeAttributes) throws IOException {
if (LOG.isDebugEnabled()) {
nodeAttributes.forEach(nodeAttribute -> LOG.debug(
nodeId.toString() + " ATTRIBUTE : " + nodeAttribute.toString()));
}
// Validate attributes
if (!nodeAttributes.stream().allMatch(
nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
.equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) {
// All attributes must be in same prefix: nm.yarn.io.
// Since we have the checks in NM to make sure attributes reported
// in HB are with correct prefix, so it should not reach here.
throw new IOException("Reject invalid node attributes from host: "
+ nodeId.toString() + ", attributes in HB must have prefix "
+ NodeAttribute.PREFIX_DISTRIBUTED);
}
// Replace all distributed node attributes associated with this host
// with the new reported attributes in node attribute manager.
Set<NodeAttribute> currentNodeAttributes =
this.rmContext.getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).keySet();
if (!currentNodeAttributes.isEmpty()) {
currentNodeAttributes = NodeLabelUtil
.filterAttributesByPrefix(currentNodeAttributes,
NodeAttribute.PREFIX_DISTRIBUTED);
}
if (!NodeLabelUtil
.isNodeAttributesEquals(nodeAttributes, currentNodeAttributes)) {
this.rmContext.getNodeAttributesManager()
.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(nodeId.getHost(), nodeAttributes));
} else if (LOG.isDebugEnabled()) {
LOG.debug("Skip updating node attributes since there is no change for "
+ nodeId + " : " + nodeAttributes);
}
}
private int getNextResponseId(int responseId) {
// Loop between 0 and Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;

View File

@ -221,10 +221,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updated NodeAttribute event to RM:"
+ newNodeToAttributesMap.values());
}
LOG.info("Updated NodeAttribute event to RM:"
+ newNodeToAttributesMap);
rmContext.getDispatcher().getEventHandler().handle(
new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
}
@ -306,9 +304,11 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) {
NodeAttributeKey attributeKey = attribute.getAttributeKey();
String attributeName = attributeKey.getAttributeName().trim();
NodeLabelUtil.checkAndThrowLabelName(attributeName);
NodeLabelUtil.checkAndThrowAttributeName(attributeName);
NodeLabelUtil
.checkAndThrowAttributePrefix(attributeKey.getAttributePrefix());
NodeLabelUtil
.checkAndThrowAttributeValue(attribute.getAttributeValue());
// ensure trimmed values are set back
attributeKey.setAttributeName(attributeName);
@ -747,8 +747,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
.values());
LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap);
rmContext.getDispatcher().getEventHandler().handle(
new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
}

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -45,6 +49,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
@ -116,6 +121,9 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@ -733,6 +741,137 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
}
@Test
public void testNodeRegistrationWithAttributes() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
File tempDir = File.createTempFile("nattr", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
NodeAttribute nodeAttribute1 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
NodeAttributeType.STRING, "V1");
NodeAttribute nodeAttribute2 = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
NodeAttributeType.STRING, "V2");
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(registerReq);
Assert.assertEquals("Action should be normal on valid Node Attributes",
NodeAction.NORMAL, response.getNodeAction());
Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(
rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).keySet(),
registerReq.getNodeAttributes()));
Assert.assertTrue("Valid Node Attributes were not accepted by RM",
response.getAreNodeAttributesAcceptedByRM());
if (rm != null) {
rm.stop();
}
}
@Test
public void testNodeRegistrationWithInvalidAttributes() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
TEMP_DIR.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest req =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
NodeAttribute validNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
NodeAttributeType.STRING, "V1");
NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
.newInstance("_P", "Attr1",
NodeAttributeType.STRING, "V2");
NodeAttribute invalidNameNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
NodeAttributeType.STRING, "V2");
NodeAttribute invalidValueNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
NodeAttributeType.STRING, "...");
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
// check invalid prefix
req.setNodeAttributes(
toSet(validNodeAttribute, invalidPrefixNodeAttribute));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertRegisterResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.endsWith("attributes in HB must have prefix nm.yarn.io"));
// check invalid name
req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertRegisterResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.startsWith("attribute name should only contains"));
// check invalid value
req.setNodeAttributes(toSet(validNodeAttribute, invalidValueNodeAttribute));
response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertRegisterResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.startsWith("attribute value should only contains"));
if (rm != null) {
rm.stop();
}
}
private void assertRegisterResponseForInvalidAttributes(
RegisterNodeManagerResponse response) {
Assert.assertEquals(
"On Invalid Node Labels action is expected to be normal",
NodeAction.NORMAL, response.getNodeAction());
Assert.assertNotNull(response.getDiagnosticsMessage());
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
response.getAreNodeLabelsAcceptedByRM());
}
private NodeStatus getNodeStatusObject(NodeId nodeId) {
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setNodeId(nodeId);
@ -835,12 +974,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
File tempDir = File.createTempFile("nattr", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
tempDir.getAbsolutePath());
TEMP_DIR.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
@ -908,6 +1043,285 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
}
@Test
public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
TEMP_DIR.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
// Register to RM
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
NodeAttribute validNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host2");
NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
.newInstance("_P", "Attr1",
NodeAttributeType.STRING, "V2");
NodeAttribute invalidNameNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
NodeAttributeType.STRING, "V2");
NodeAttribute invalidValueNodeAttribute = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
NodeAttributeType.STRING, "...");
// Set node attributes in HB.
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
int responseId = nodeStatusObject.getResponseId();
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
heartbeatReq.setNodeAttributes(toSet(validNodeAttribute));
// Send first HB to RM with invalid prefix node attributes
heartbeatReq.setNodeAttributes(
toSet(validNodeAttribute, invalidPrefixNodeAttribute));
NodeHeartbeatResponse response =
resourceTrackerService.nodeHeartbeat(heartbeatReq);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertNodeHeartbeatResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.endsWith("attributes in HB must have prefix nm.yarn.io"));
// Send another HB to RM with invalid name node attributes
nodeStatusObject.setResponseId(++responseId);
heartbeatReq
.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertNodeHeartbeatResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.startsWith("attribute name should only contains"));
// Send another HB to RM with invalid value node attributes
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeAttributes(
toSet(validNodeAttribute, invalidValueNodeAttribute));
response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
.getAttributesForNode(nodeId.getHost()).size());
assertNodeHeartbeatResponseForInvalidAttributes(response);
Assert.assertTrue(response.getDiagnosticsMessage()
.startsWith("attribute value should only contains"));
// Send another HB to RM with updated node attribute
NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host3");
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute));
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM gets the updated attribute
NodeAttributesManager attributeManager =
rm.getRMContext().getNodeAttributesManager();
Map<NodeAttribute, AttributeValue> attrs =
attributeManager.getAttributesForNode(nodeId.getHost());
Assert.assertEquals(1, attrs.size());
NodeAttribute na = attrs.keySet().iterator().next();
Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
Assert.assertEquals("host3", na.getAttributeValue());
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
}
private void assertNodeHeartbeatResponseForInvalidAttributes(
NodeHeartbeatResponse response) {
Assert.assertEquals(
"On Invalid Node Labels action is expected to be normal",
NodeAction.NORMAL, response.getNodeAction());
Assert.assertNotNull(response.getDiagnosticsMessage());
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
response.getAreNodeLabelsAcceptedByRM());
}
@Test
public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded()
throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
NullNodeAttributeStore.class, NodeAttributeStore.class);
conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
TEMP_DIR.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
// spy node attributes manager
NodeAttributesManager tmpAttributeManager =
rm.getRMContext().getNodeAttributesManager();
NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager);
rm.getRMContext().setNodeAttributesManager(spyAttributeManager);
AtomicInteger count = new AtomicInteger(0);
Mockito.doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) throws Exception {
count.incrementAndGet();
tmpAttributeManager
.replaceNodeAttributes((String) invocation.getArguments()[0],
(Map<String, Set<NodeAttribute>>) invocation.getArguments()[1]);
return null;
}
}).when(spyAttributeManager)
.replaceNodeAttributes(Mockito.any(String.class),
Mockito.any(Map.class));
// Register to RM
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
Set<NodeAttribute> nodeAttributes = new HashSet<>();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host2"));
// Set node attributes in HB.
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
int responseId = nodeStatusObject.getResponseId();
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Ensure RM gets correct node attributes update.
Map<NodeAttribute, AttributeValue> attrs = spyAttributeManager
.getAttributesForNode(nodeId.getHost());
spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost()));
Assert.assertEquals(1, attrs.size());
NodeAttribute na = attrs.keySet().iterator().next();
Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
Assert.assertEquals("host2", na.getAttributeValue());
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
Assert.assertEquals(1, count.get());
// Send HBs to RM with the same node attributes
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM updated node attributes once
Assert.assertEquals(1, count.get());
// Send another HB to RM with updated node attributes
nodeAttributes.clear();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host3"));
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM gets the updated attribute
attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
Assert.assertEquals(1, attrs.size());
na = attrs.keySet().iterator().next();
Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
Assert.assertEquals("host3", na.getAttributeValue());
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
// Make sure RM updated node attributes twice
Assert.assertEquals(2, count.get());
// Add centralized attributes
Map<String, Set<NodeAttribute>> nodeAttributeMapping = ImmutableMap
.of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance(
NodeAttribute.PREFIX_CENTRALIZED, "centAttr",
NodeAttributeType.STRING, "x")));
spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
nodeAttributeMapping);
// Make sure RM updated node attributes three times
Assert.assertEquals(3, count.get());
// Send another HB to RM with non-updated node attributes
nodeAttributes.clear();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host3"));
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM still updated node attributes three times
Assert.assertEquals(3, count.get());
// Send another HB to RM with updated node attributes
nodeAttributes.clear();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host4"));
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM gets the updated attribute
attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
Assert.assertEquals(2, attrs.size());
attrs.keySet().stream().forEach(e -> {
Assert.assertEquals(NodeAttributeType.STRING, e.getAttributeType());
if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_DISTRIBUTED) {
Assert.assertEquals("host", e.getAttributeKey().getAttributeName());
Assert.assertEquals("host4", e.getAttributeValue());
} else if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_CENTRALIZED) {
Assert.assertEquals("centAttr", e.getAttributeKey().getAttributeName());
Assert.assertEquals("x", e.getAttributeValue());
}
});
// Make sure RM updated node attributes four times
Assert.assertEquals(4, count.get());
if (rm != null) {
rm.stop();
}
}
@Test
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
writeToHostsFile("host2");
@ -2447,4 +2861,34 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
response.getNodeAction());
mockRM.stop();
}
/**
* A no-op implementation of NodeAttributeStore for testing
*/
public static class NullNodeAttributeStore implements NodeAttributeStore {
@Override
public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
}
@Override
public void addNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
}
@Override
public void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
}
@Override
public void init(Configuration configuration, NodeAttributesManager mgr) {
}
@Override
public void recover() {
}
@Override
public void close() {
}
}
}