YARN-9011. Race condition during decommissioning. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2019-11-26 14:27:55 +01:00
parent fb2df5234d
commit 80a97e6ac2
4 changed files with 140 additions and 12 deletions

View File

@ -51,6 +51,8 @@ public class HostsFileReader {
.class);
private final AtomicReference<HostDetails> current;
private final AtomicReference<HostDetails> lazyLoaded =
new AtomicReference<>();
public HostsFileReader(String inFile,
String exFile) throws IOException {
@ -186,7 +188,18 @@ public class HostsFileReader {
public void refresh(String includesFile, String excludesFile)
throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
refreshInternal(includesFile, excludesFile, false);
}
public void lazyRefresh(String includesFile, String excludesFile)
throws IOException {
refreshInternal(includesFile, excludesFile, true);
}
private void refreshInternal(String includesFile, String excludesFile,
boolean lazy) throws IOException {
LOG.info("Refreshing hosts (include/exclude) list (lazy refresh = {})",
lazy);
HostDetails oldDetails = current.get();
Set<String> newIncludes = oldDetails.includes;
Map<String, Integer> newExcludes = oldDetails.excludes;
@ -202,8 +215,22 @@ public class HostsFileReader {
}
HostDetails newDetails = new HostDetails(includesFile, newIncludes,
excludesFile, newExcludes);
if (lazy) {
lazyLoaded.set(newDetails);
} else {
current.set(newDetails);
}
}
public void finishRefresh() {
if (lazyLoaded.get() == null) {
throw new IllegalStateException(
"Cannot finish refresh - call lazyRefresh() first");
}
current.set(lazyLoaded.get());
lazyLoaded.set(null);
}
@Private
public void refresh(InputStream inFileInputStream,
@ -278,6 +305,10 @@ public class HostsFileReader {
return current.get();
}
public HostDetails getLazyLoadedHostDetails() {
return lazyLoaded.get();
}
public void setIncludesFile(String includesFile) {
LOG.info("Setting the includes file to " + includesFile);
HostDetails oldDetails = current.get();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.test.GenericTestUtils;
@ -347,4 +348,62 @@ public class TestHostsFileReader {
assertTrue(excludes.get("host5") == 1800);
assertTrue(excludes.get("host6") == 1800);
}
@Test
public void testLazyRefresh() throws IOException {
FileWriter efw = new FileWriter(excludesFile);
FileWriter ifw = new FileWriter(includesFile);
efw.write("host1\n");
efw.write("host2\n");
efw.close();
ifw.write("host3\n");
ifw.write("host4\n");
ifw.close();
HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
ifw = new FileWriter(includesFile);
ifw.close();
efw = new FileWriter(excludesFile, true);
efw.write("host3\n");
efw.write("host4\n");
efw.close();
hfp.lazyRefresh(includesFile, excludesFile);
HostDetails details = hfp.getHostDetails();
HostDetails lazyDetails = hfp.getLazyLoadedHostDetails();
assertEquals("Details: no. of excluded hosts", 2,
details.getExcludedHosts().size());
assertEquals("Details: no. of included hosts", 2,
details.getIncludedHosts().size());
assertEquals("LazyDetails: no. of excluded hosts", 4,
lazyDetails.getExcludedHosts().size());
assertEquals("LayDetails: no. of included hosts", 0,
lazyDetails.getIncludedHosts().size());
hfp.finishRefresh();
details = hfp.getHostDetails();
assertEquals("Details: no. of excluded hosts", 4,
details.getExcludedHosts().size());
assertEquals("Details: no. of included hosts", 0,
details.getIncludedHosts().size());
assertNull("Lazy host details should be null",
hfp.getLazyLoadedHostDetails());
}
@Test(expected = IllegalStateException.class)
public void testFinishRefreshWithoutLazyRefresh() throws IOException {
FileWriter efw = new FileWriter(excludesFile);
FileWriter ifw = new FileWriter(includesFile);
efw.close();
ifw.close();
HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
hfp.finishRefresh();
}
}

View File

@ -83,10 +83,12 @@ public class NodesListManager extends CompositeService implements
private Resolver resolver;
private Timer removalTimer;
private int nodeRemovalCheckInterval;
private Set<RMNode> gracefulDecommissionableNodes;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
this.rmContext = rmContext;
this.gracefulDecommissionableNodes = ConcurrentHashMap.newKeySet();
}
@Override
@ -114,7 +116,7 @@ public class NodesListManager extends CompositeService implements
this.hostsReader =
createHostsFileReader(this.includesFile, this.excludesFile);
setDecomissionedNMs();
printConfiguredHosts();
printConfiguredHosts(false);
} catch (YarnException ex) {
disableHostsFileReader(ex);
} catch (IOException ioe) {
@ -186,7 +188,7 @@ public class NodesListManager extends CompositeService implements
removalTimer.cancel();
}
private void printConfiguredHosts() {
private void printConfiguredHosts(boolean graceful) {
if (!LOG.isDebugEnabled()) {
return;
}
@ -197,7 +199,12 @@ public class NodesListManager extends CompositeService implements
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
HostDetails hostDetails = hostsReader.getHostDetails();
HostDetails hostDetails;
if (graceful) {
hostDetails = hostsReader.getLazyLoadedHostDetails();
} else {
hostDetails = hostsReader.getHostDetails();
}
for (String include : hostDetails.getIncludedHosts()) {
LOG.debug("include: " + include);
}
@ -234,8 +241,15 @@ public class NodesListManager extends CompositeService implements
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
LOG.info("refreshNodes excludesFile " + excludesFile);
if (graceful) {
// update hosts, but don't make it visible just yet
hostsReader.lazyRefresh(includesFile, excludesFile);
} else {
hostsReader.refresh(includesFile, excludesFile);
printConfiguredHosts();
}
printConfiguredHosts(graceful);
LOG.info("hostsReader include:{" +
StringUtils.join(",", hostsReader.getHosts()) +
@ -269,7 +283,14 @@ public class NodesListManager extends CompositeService implements
// Nodes need to be decommissioned (graceful or forceful);
List<RMNode> nodesToDecom = new ArrayList<RMNode>();
HostDetails hostDetails = hostsReader.getHostDetails();
HostDetails hostDetails;
gracefulDecommissionableNodes.clear();
if (graceful) {
hostDetails = hostsReader.getLazyLoadedHostDetails();
} else {
hostDetails = hostsReader.getHostDetails();
}
Set<String> includes = hostDetails.getIncludedHosts();
Map<String, Integer> excludes = hostDetails.getExcludedMap();
@ -297,11 +318,13 @@ public class NodesListManager extends CompositeService implements
s != NodeState.DECOMMISSIONING) {
LOG.info("Gracefully decommission " + nodeStr);
nodesToDecom.add(n);
gracefulDecommissionableNodes.add(n);
} else if (s == NodeState.DECOMMISSIONING &&
!Objects.equals(n.getDecommissioningTimeout(),
timeoutToUse)) {
LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
nodesToDecom.add(n);
gracefulDecommissionableNodes.add(n);
} else {
LOG.info("No action for " + nodeStr);
}
@ -314,6 +337,10 @@ public class NodesListManager extends CompositeService implements
}
}
if (graceful) {
hostsReader.finishRefresh();
}
for (RMNode n : nodesToRecom) {
RMNodeEvent e = new RMNodeEvent(
n.getNodeID(), RMNodeEventType.RECOMMISSION);
@ -465,6 +492,10 @@ public class NodesListManager extends CompositeService implements
hostDetails.getExcludedHosts());
}
boolean isGracefullyDecommissionableNode(RMNode node) {
return gracefulDecommissionableNodes.contains(node);
}
private boolean isValidNode(
String hostName, Set<String> hostsList, Set<String> excludeList) {
String ip = resolver.resolve(hostName);

View File

@ -731,10 +731,17 @@ public class ResourceTrackerService extends AbstractService implements
*/
private boolean isNodeInDecommissioning(NodeId nodeId) {
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode != null &&
rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
if (rmNode != null) {
NodeState state = rmNode.getState();
if (state == NodeState.DECOMMISSIONING ||
(state == NodeState.RUNNING &&
this.nodesListManager.isGracefullyDecommissionableNode(rmNode))) {
return true;
}
}
return false;
}