YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels. (Bibin A Chundatt via wangda)

This commit is contained in:
Wangda Tan 2015-10-05 15:46:35 -07:00
parent b59e434116
commit 30ac69c6bd
5 changed files with 77 additions and 30 deletions

View File

@ -486,6 +486,9 @@ Release 2.8.0 - UNRELEASED
YARN-4095. Avoid sharing AllocatorPerContext object in LocalDirAllocator
between ShuffleHandler and LocalDirsHandlerService. (Zhihai Xu via jlowe)
YARN-4176. Resync NM nodelabels with RM periodically for distributed nodelabels.
(Bibin A Chundatt via wangda)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -2019,6 +2019,12 @@ public class YarnConfiguration extends Configuration {
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
public static final String NM_NODE_LABELS_RESYNC_INTERVAL =
NM_NODE_LABELS_PREFIX + "resync-interval-ms";
public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
2 * 60 * 1000;
// If -1 is configured then no timer task should be created
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";

View File

@ -2137,6 +2137,15 @@
<value>600000</value>
</property>
<property>
<description>
Interval at which node labels syncs with RM from NM.Will send loaded labels
every x intervals configured along with heartbeat from NM to RM.
</description>
<name>yarn.nodemanager.node-labels.resync-interval-ms</name>
<value>120000</value>
</property>
<property>
<description>
When node labels "yarn.nodemanager.node-labels.provider"

View File

@ -139,6 +139,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private NMNodeLabelsHandler nodeLabelsHandler;
private final NodeLabelsProvider nodeLabelsProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@ -150,9 +151,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
this.context = context;
this.dispatcher = dispatcher;
this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
@ -184,7 +185,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@ -871,7 +873,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (nodeLabelsProvider == null) {
return new NMCentralizedNodeLabelsHandler();
} else {
return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
return new NMDistributedNodeLabelsHandler(nodeLabelsProvider,
this.getConfig());
}
}
@ -936,16 +939,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private static class NMDistributedNodeLabelsHandler
implements NMNodeLabelsHandler {
private NMDistributedNodeLabelsHandler(
NodeLabelsProvider nodeLabelsProvider) {
NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
this.nodeLabelsProvider = nodeLabelsProvider;
this.resyncInterval =
conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
}
private final NodeLabelsProvider nodeLabelsProvider;
private Set<NodeLabel> previousNodeLabels;
private boolean updatedLabelsSentToRM;
private long lastNodeLabelSendFailMills = 0L;
// TODO : Need to check which conf to use.Currently setting as 1 min
private static final long FAILEDLABELRESENDINTERVAL = 60000;
private boolean areLabelsSentToRM;
private long lastNodeLabelSendMills = 0L;
private final long resyncInterval;
@Override
public Set<NodeLabel> getNodeLabelsForRegistration() {
@ -987,22 +992,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// take some action only on modification of labels
boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
|| checkResendLabelOnFailure();
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = false;
if (areNodeLabelsUpdated) {
areLabelsSentToRM = false;
// When nodelabels elapsed or resync time is elapsed will send again in
// heartbeat.
if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
previousNodeLabels = nodeLabelsForHeartbeat;
try {
LOG.info("Modified labels from provider: "
+ StringUtils.join(",", previousNodeLabels));
if (LOG.isDebugEnabled()) {
LOG.debug("Labels from provider: "
+ StringUtils.join(",", previousNodeLabels));
}
validateNodeLabels(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = true;
areLabelsSentToRM = true;
} catch (IOException e) {
// set previous node labels to invalid set, so that invalid
// labels are not verified for every HB, and send empty set
// to RM to have same nodeLabels which was earlier set.
nodeLabelsForHeartbeat = null;
} finally {
// Set last send time in heartbeat
lastNodeLabelSendMills = System.currentTimeMillis();
}
} else {
// if nodelabels have not changed then no need to send
@ -1033,16 +1044,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
/*
* In case of failure when RM doesnt accept labels need to resend Labels to
* RM. This method checks whether we need to resend
* This method checks resync interval is elapsed or not.
*/
public boolean checkResendLabelOnFailure() {
if (lastNodeLabelSendFailMills > 0L) {
long lastFailTimePassed =
System.currentTimeMillis() - lastNodeLabelSendFailMills;
if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
return true;
}
public boolean isResyncIntervalElapsed() {
long elapsedTimeSinceLastSync =
System.currentTimeMillis() - lastNodeLabelSendMills;
if (elapsedTimeSinceLastSync > resyncInterval) {
return true;
}
return false;
}
@ -1050,15 +1058,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
if (updatedLabelsSentToRM) {
if (response.getAreNodeLabelsAcceptedByRM()) {
lastNodeLabelSendFailMills = 0L;
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
if (areLabelsSentToRM) {
if (response.getAreNodeLabelsAcceptedByRM() && LOG.isDebugEnabled()) {
LOG.debug("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were Accepted by RM ");
} else {
// case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels
lastNodeLabelSendFailMills = System.currentTimeMillis();
LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were not accepted by RM and message from RM : "

View File

@ -250,6 +250,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
};
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
conf.setLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL, 2000);
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
@ -288,7 +289,29 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
assertTrue("If provider sends null then empty labels should be sent",
resourceTracker.labels.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag();
// Since the resync interval is set to 2 sec in every alternate heartbeat
// the labels will be send along with heartbeat.In loop we sleep for 1 sec
// so that every sec 1 heartbeat is send.
int nullLabels = 0;
int nonNullLabels = 0;
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
for (int i = 0; i < 5; i++) {
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
if (null == resourceTracker.labels) {
nullLabels++;
} else {
Assert.assertEquals("In heartbeat PI labels should be send",
toNodeLabelSet("P1"), resourceTracker.labels);
nonNullLabels++;
}
resourceTracker.resetNMHeartbeatReceiveFlag();
Thread.sleep(1000);
}
Assert.assertTrue("More than one heartbeat with empty labels expected",
nullLabels > 1);
Assert.assertTrue("More than one heartbeat with labels expected",
nonNullLabels > 1);
nm.stop();
}