YARN-9011. Race condition during decommissioning. Contributed by Peter Bacsko
This commit is contained in:
parent
7f811722b7
commit
27642367ef
|
@ -52,6 +52,8 @@ public class HostsFileReader {
|
|||
.class);
|
||||
|
||||
private final AtomicReference<HostDetails> current;
|
||||
private final AtomicReference<HostDetails> lazyLoaded =
|
||||
new AtomicReference<>();
|
||||
|
||||
public HostsFileReader(String inFile,
|
||||
String exFile) throws IOException {
|
||||
|
@ -187,7 +189,18 @@ public class HostsFileReader {
|
|||
|
||||
public void refresh(String includesFile, String excludesFile)
|
||||
throws IOException {
|
||||
LOG.info("Refreshing hosts (include/exclude) list");
|
||||
refreshInternal(includesFile, excludesFile, false);
|
||||
}
|
||||
|
||||
public void lazyRefresh(String includesFile, String excludesFile)
|
||||
throws IOException {
|
||||
refreshInternal(includesFile, excludesFile, true);
|
||||
}
|
||||
|
||||
private void refreshInternal(String includesFile, String excludesFile,
|
||||
boolean lazy) throws IOException {
|
||||
LOG.info("Refreshing hosts (include/exclude) list (lazy refresh = {})",
|
||||
lazy);
|
||||
HostDetails oldDetails = current.get();
|
||||
Set<String> newIncludes = oldDetails.includes;
|
||||
Map<String, Integer> newExcludes = oldDetails.excludes;
|
||||
|
@ -203,8 +216,22 @@ public class HostsFileReader {
|
|||
}
|
||||
HostDetails newDetails = new HostDetails(includesFile, newIncludes,
|
||||
excludesFile, newExcludes);
|
||||
|
||||
if (lazy) {
|
||||
lazyLoaded.set(newDetails);
|
||||
} else {
|
||||
current.set(newDetails);
|
||||
}
|
||||
}
|
||||
|
||||
public void finishRefresh() {
|
||||
if (lazyLoaded.get() == null) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot finish refresh - call lazyRefresh() first");
|
||||
}
|
||||
current.set(lazyLoaded.get());
|
||||
lazyLoaded.set(null);
|
||||
}
|
||||
|
||||
@Private
|
||||
public void refresh(InputStream inFileInputStream,
|
||||
|
@ -279,6 +306,10 @@ public class HostsFileReader {
|
|||
return current.get();
|
||||
}
|
||||
|
||||
public HostDetails getLazyLoadedHostDetails() {
|
||||
return lazyLoaded.get();
|
||||
}
|
||||
|
||||
public void setIncludesFile(String includesFile) {
|
||||
LOG.info("Setting the includes file to " + includesFile);
|
||||
HostDetails oldDetails = current.get();
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.util;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -347,4 +348,62 @@ public class TestHostsFileReader {
|
|||
assertTrue(excludes.get("host5") == 1800);
|
||||
assertTrue(excludes.get("host6") == 1800);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLazyRefresh() throws IOException {
|
||||
FileWriter efw = new FileWriter(excludesFile);
|
||||
FileWriter ifw = new FileWriter(includesFile);
|
||||
|
||||
efw.write("host1\n");
|
||||
efw.write("host2\n");
|
||||
efw.close();
|
||||
ifw.write("host3\n");
|
||||
ifw.write("host4\n");
|
||||
ifw.close();
|
||||
|
||||
HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
|
||||
|
||||
ifw = new FileWriter(includesFile);
|
||||
ifw.close();
|
||||
|
||||
efw = new FileWriter(excludesFile, true);
|
||||
efw.write("host3\n");
|
||||
efw.write("host4\n");
|
||||
efw.close();
|
||||
|
||||
hfp.lazyRefresh(includesFile, excludesFile);
|
||||
|
||||
HostDetails details = hfp.getHostDetails();
|
||||
HostDetails lazyDetails = hfp.getLazyLoadedHostDetails();
|
||||
|
||||
assertEquals("Details: no. of excluded hosts", 2,
|
||||
details.getExcludedHosts().size());
|
||||
assertEquals("Details: no. of included hosts", 2,
|
||||
details.getIncludedHosts().size());
|
||||
assertEquals("LazyDetails: no. of excluded hosts", 4,
|
||||
lazyDetails.getExcludedHosts().size());
|
||||
assertEquals("LayDetails: no. of included hosts", 0,
|
||||
lazyDetails.getIncludedHosts().size());
|
||||
|
||||
hfp.finishRefresh();
|
||||
|
||||
details = hfp.getHostDetails();
|
||||
assertEquals("Details: no. of excluded hosts", 4,
|
||||
details.getExcludedHosts().size());
|
||||
assertEquals("Details: no. of included hosts", 0,
|
||||
details.getIncludedHosts().size());
|
||||
assertNull("Lazy host details should be null",
|
||||
hfp.getLazyLoadedHostDetails());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testFinishRefreshWithoutLazyRefresh() throws IOException {
|
||||
FileWriter efw = new FileWriter(excludesFile);
|
||||
FileWriter ifw = new FileWriter(includesFile);
|
||||
efw.close();
|
||||
ifw.close();
|
||||
|
||||
HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
|
||||
hfp.finishRefresh();
|
||||
}
|
||||
}
|
|
@ -84,10 +84,12 @@ public class NodesListManager extends CompositeService implements
|
|||
private Resolver resolver;
|
||||
private Timer removalTimer;
|
||||
private int nodeRemovalCheckInterval;
|
||||
private Set<RMNode> gracefulDecommissionableNodes;
|
||||
|
||||
public NodesListManager(RMContext rmContext) {
|
||||
super(NodesListManager.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
this.gracefulDecommissionableNodes = ConcurrentHashMap.newKeySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -115,7 +117,7 @@ public class NodesListManager extends CompositeService implements
|
|||
this.hostsReader =
|
||||
createHostsFileReader(this.includesFile, this.excludesFile);
|
||||
setDecommissionedNMs();
|
||||
printConfiguredHosts();
|
||||
printConfiguredHosts(false);
|
||||
} catch (YarnException ex) {
|
||||
disableHostsFileReader(ex);
|
||||
} catch (IOException ioe) {
|
||||
|
@ -187,7 +189,7 @@ public class NodesListManager extends CompositeService implements
|
|||
removalTimer.cancel();
|
||||
}
|
||||
|
||||
private void printConfiguredHosts() {
|
||||
private void printConfiguredHosts(boolean graceful) {
|
||||
if (!LOG.isDebugEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
@ -198,7 +200,12 @@ public class NodesListManager extends CompositeService implements
|
|||
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
|
||||
|
||||
HostDetails hostDetails = hostsReader.getHostDetails();
|
||||
HostDetails hostDetails;
|
||||
if (graceful) {
|
||||
hostDetails = hostsReader.getLazyLoadedHostDetails();
|
||||
} else {
|
||||
hostDetails = hostsReader.getHostDetails();
|
||||
}
|
||||
for (String include : hostDetails.getIncludedHosts()) {
|
||||
LOG.debug("include: " + include);
|
||||
}
|
||||
|
@ -235,8 +242,15 @@ public class NodesListManager extends CompositeService implements
|
|||
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
|
||||
LOG.info("refreshNodes excludesFile " + excludesFile);
|
||||
|
||||
if (graceful) {
|
||||
// update hosts, but don't make it visible just yet
|
||||
hostsReader.lazyRefresh(includesFile, excludesFile);
|
||||
} else {
|
||||
hostsReader.refresh(includesFile, excludesFile);
|
||||
printConfiguredHosts();
|
||||
}
|
||||
|
||||
printConfiguredHosts(graceful);
|
||||
|
||||
LOG.info("hostsReader include:{" +
|
||||
StringUtils.join(",", hostsReader.getHosts()) +
|
||||
|
@ -270,7 +284,14 @@ public class NodesListManager extends CompositeService implements
|
|||
// Nodes need to be decommissioned (graceful or forceful);
|
||||
List<RMNode> nodesToDecom = new ArrayList<RMNode>();
|
||||
|
||||
HostDetails hostDetails = hostsReader.getHostDetails();
|
||||
HostDetails hostDetails;
|
||||
gracefulDecommissionableNodes.clear();
|
||||
if (graceful) {
|
||||
hostDetails = hostsReader.getLazyLoadedHostDetails();
|
||||
} else {
|
||||
hostDetails = hostsReader.getHostDetails();
|
||||
}
|
||||
|
||||
Set<String> includes = hostDetails.getIncludedHosts();
|
||||
Map<String, Integer> excludes = hostDetails.getExcludedMap();
|
||||
|
||||
|
@ -298,11 +319,13 @@ public class NodesListManager extends CompositeService implements
|
|||
s != NodeState.DECOMMISSIONING) {
|
||||
LOG.info("Gracefully decommission " + nodeStr);
|
||||
nodesToDecom.add(n);
|
||||
gracefulDecommissionableNodes.add(n);
|
||||
} else if (s == NodeState.DECOMMISSIONING &&
|
||||
!Objects.equals(n.getDecommissioningTimeout(),
|
||||
timeoutToUse)) {
|
||||
LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
|
||||
nodesToDecom.add(n);
|
||||
gracefulDecommissionableNodes.add(n);
|
||||
} else {
|
||||
LOG.info("No action for " + nodeStr);
|
||||
}
|
||||
|
@ -315,6 +338,10 @@ public class NodesListManager extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
if (graceful) {
|
||||
hostsReader.finishRefresh();
|
||||
}
|
||||
|
||||
for (RMNode n : nodesToRecom) {
|
||||
RMNodeEvent e = new RMNodeEvent(
|
||||
n.getNodeID(), RMNodeEventType.RECOMMISSION);
|
||||
|
@ -466,6 +493,10 @@ public class NodesListManager extends CompositeService implements
|
|||
hostDetails.getExcludedHosts());
|
||||
}
|
||||
|
||||
boolean isGracefullyDecommissionableNode(RMNode node) {
|
||||
return gracefulDecommissionableNodes.contains(node);
|
||||
}
|
||||
|
||||
private boolean isValidNode(
|
||||
String hostName, Set<String> hostsList, Set<String> excludeList) {
|
||||
String ip = resolver.resolve(hostName);
|
||||
|
|
|
@ -836,10 +836,17 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
*/
|
||||
private boolean isNodeInDecommissioning(NodeId nodeId) {
|
||||
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
||||
if (rmNode != null &&
|
||||
rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
|
||||
|
||||
if (rmNode != null) {
|
||||
NodeState state = rmNode.getState();
|
||||
|
||||
if (state == NodeState.DECOMMISSIONING ||
|
||||
(state == NodeState.RUNNING &&
|
||||
this.nodesListManager.isGracefullyDecommissionableNode(rmNode))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue