diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bd38c2d2d56..2ff2f5c77e0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -486,6 +486,9 @@ Release 2.8.0 - UNRELEASED
YARN-4095. Avoid sharing AllocatorPerContext object in LocalDirAllocator
between ShuffleHandler and LocalDirsHandlerService. (Zhihai Xu via jlowe)
+ YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels.
+ (Bibin A Chundatt via wangda)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d7bd6785512..93b81a6d103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2019,6 +2019,12 @@ public class YarnConfiguration extends Configuration {
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
+ public static final String NM_NODE_LABELS_RESYNC_INTERVAL =
+ NM_NODE_LABELS_PREFIX + "resync-interval-ms";
+
+ public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
+ 2 * 60 * 1000;
+
// If -1 is configured then no timer task should be created
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f2e2f019190..29d26d11433 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2137,6 +2137,15 @@
600000
+
+
+ Interval at which node labels syncs with RM from NM.Will send loaded labels
+ every x intervals configured along with heartbeat from NM to RM.
+
+ yarn.nodemanager.node-labels.resync-interval-ms
+ 120000
+
+
When node labels "yarn.nodemanager.node-labels.provider"
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 1b186c81b31..9c720042e95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -139,6 +139,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Set pendingContainersToRemove = new HashSet();
private NMNodeLabelsHandler nodeLabelsHandler;
+ private final NodeLabelsProvider nodeLabelsProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -150,9 +151,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
- nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
this.context = context;
this.dispatcher = dispatcher;
+ this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap();
this.pendingCompletedContainers =
@@ -184,7 +185,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
-
+
+ nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@@ -871,7 +873,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (nodeLabelsProvider == null) {
return new NMCentralizedNodeLabelsHandler();
} else {
- return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
+ return new NMDistributedNodeLabelsHandler(nodeLabelsProvider,
+ this.getConfig());
}
}
@@ -936,16 +939,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private static class NMDistributedNodeLabelsHandler
implements NMNodeLabelsHandler {
private NMDistributedNodeLabelsHandler(
- NodeLabelsProvider nodeLabelsProvider) {
+ NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
this.nodeLabelsProvider = nodeLabelsProvider;
+ this.resyncInterval =
+ conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
+ YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
}
private final NodeLabelsProvider nodeLabelsProvider;
private Set previousNodeLabels;
- private boolean updatedLabelsSentToRM;
- private long lastNodeLabelSendFailMills = 0L;
- // TODO : Need to check which conf to use.Currently setting as 1 min
- private static final long FAILEDLABELRESENDINTERVAL = 60000;
+ private boolean areLabelsSentToRM;
+ private long lastNodeLabelSendMills = 0L;
+ private final long resyncInterval;
@Override
public Set getNodeLabelsForRegistration() {
@@ -987,22 +992,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// take some action only on modification of labels
boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
- || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
- || checkResendLabelOnFailure();
+ || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
- updatedLabelsSentToRM = false;
- if (areNodeLabelsUpdated) {
+ areLabelsSentToRM = false;
+ // When nodelabels elapsed or resync time is elapsed will send again in
+ // heartbeat.
+ if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
previousNodeLabels = nodeLabelsForHeartbeat;
try {
- LOG.info("Modified labels from provider: "
- + StringUtils.join(",", previousNodeLabels));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Labels from provider: "
+ + StringUtils.join(",", previousNodeLabels));
+ }
validateNodeLabels(nodeLabelsForHeartbeat);
- updatedLabelsSentToRM = true;
+ areLabelsSentToRM = true;
} catch (IOException e) {
// set previous node labels to invalid set, so that invalid
// labels are not verified for every HB, and send empty set
// to RM to have same nodeLabels which was earlier set.
nodeLabelsForHeartbeat = null;
+ } finally {
+ // Set last send time in heartbeat
+ lastNodeLabelSendMills = System.currentTimeMillis();
}
} else {
// if nodelabels have not changed then no need to send
@@ -1033,16 +1044,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
/*
- * In case of failure when RM doesnt accept labels need to resend Labels to
- * RM. This method checks whether we need to resend
+ * This method checks resync interval is elapsed or not.
*/
- public boolean checkResendLabelOnFailure() {
- if (lastNodeLabelSendFailMills > 0L) {
- long lastFailTimePassed =
- System.currentTimeMillis() - lastNodeLabelSendFailMills;
- if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
- return true;
- }
+ public boolean isResyncIntervalElapsed() {
+ long elapsedTimeSinceLastSync =
+ System.currentTimeMillis() - lastNodeLabelSendMills;
+ if (elapsedTimeSinceLastSync > resyncInterval) {
+ return true;
}
return false;
}
@@ -1050,15 +1058,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
- if (updatedLabelsSentToRM) {
- if (response.getAreNodeLabelsAcceptedByRM()) {
- lastNodeLabelSendFailMills = 0L;
- LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ if (areLabelsSentToRM) {
+ if (response.getAreNodeLabelsAcceptedByRM() && LOG.isDebugEnabled()) {
+ LOG.debug("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were Accepted by RM ");
} else {
// case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels
- lastNodeLabelSendFailMills = System.currentTimeMillis();
LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were not accepted by RM and message from RM : "
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 099e4b42d82..b27d15b84d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -250,6 +250,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
};
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+ conf.setLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, 2000);
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
@@ -288,7 +289,29 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
assertTrue("If provider sends null then empty labels should be sent",
resourceTracker.labels.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag();
-
+ // Since the resync interval is set to 2 sec in every alternate heartbeat
+ // the labels will be send along with heartbeat.In loop we sleep for 1 sec
+ // so that every sec 1 heartbeat is send.
+ int nullLabels = 0;
+ int nonNullLabels = 0;
+ dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
+ for (int i = 0; i < 5; i++) {
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ resourceTracker.waitTillHeartbeat();
+ if (null == resourceTracker.labels) {
+ nullLabels++;
+ } else {
+ Assert.assertEquals("In heartbeat PI labels should be send",
+ toNodeLabelSet("P1"), resourceTracker.labels);
+ nonNullLabels++;
+ }
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+ Thread.sleep(1000);
+ }
+ Assert.assertTrue("More than one heartbeat with empty labels expected",
+ nullLabels > 1);
+ Assert.assertTrue("More than one heartbeat with labels expected",
+ nonNullLabels > 1);
nm.stop();
}