YARN-7162. Remove XML excludes file format (rkanter)
This commit is contained in:
parent
9f6b08f840
commit
d3d32d3580
|
@ -18,29 +18,23 @@
|
|||
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
// Keeps track of which datanodes/nodemanagers are allowed to connect to the
|
||||
// namenode/resourcemanager.
|
||||
|
@ -56,7 +50,7 @@ public class HostsFileReader {
|
|||
String exFile) throws IOException {
|
||||
HostDetails hostDetails = new HostDetails(
|
||||
inFile, Collections.emptySet(),
|
||||
exFile, Collections.emptyMap());
|
||||
exFile, Collections.emptySet());
|
||||
current = new AtomicReference<>(hostDetails);
|
||||
refresh(inFile, exFile);
|
||||
}
|
||||
|
@ -66,7 +60,7 @@ public class HostsFileReader {
|
|||
String excludesFile, InputStream exFileInputStream) throws IOException {
|
||||
HostDetails hostDetails = new HostDetails(
|
||||
includesFile, Collections.emptySet(),
|
||||
excludesFile, Collections.emptyMap());
|
||||
excludesFile, Collections.emptySet());
|
||||
current = new AtomicReference<>(hostDetails);
|
||||
refresh(inFileInputStream, exFileInputStream);
|
||||
}
|
||||
|
@ -117,88 +111,21 @@ public class HostsFileReader {
|
|||
refresh(hostDetails.includesFile, hostDetails.excludesFile);
|
||||
}
|
||||
|
||||
public static void readFileToMap(String type,
|
||||
String filename, Map<String, Integer> map) throws IOException {
|
||||
File file = new File(filename);
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
readFileToMapWithFileInputStream(type, filename, fis, map);
|
||||
}
|
||||
|
||||
public static void readFileToMapWithFileInputStream(String type,
|
||||
String filename, InputStream inputStream, Map<String, Integer> map)
|
||||
throws IOException {
|
||||
// The input file could be either simple text or XML.
|
||||
boolean xmlInput = filename.toLowerCase().endsWith(".xml");
|
||||
if (xmlInput) {
|
||||
readXmlFileToMapWithFileInputStream(type, filename, inputStream, map);
|
||||
} else {
|
||||
HashSet<String> nodes = new HashSet<String>();
|
||||
readFileToSetWithFileInputStream(type, filename, inputStream, nodes);
|
||||
for (String node : nodes) {
|
||||
map.put(node, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void readXmlFileToMapWithFileInputStream(String type,
|
||||
String filename, InputStream fileInputStream, Map<String, Integer> map)
|
||||
throws IOException {
|
||||
Document dom;
|
||||
DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance();
|
||||
try {
|
||||
DocumentBuilder db = builder.newDocumentBuilder();
|
||||
dom = db.parse(fileInputStream);
|
||||
// Examples:
|
||||
// <host><name>host1</name></host>
|
||||
// <host><name>host2</name><timeout>123</timeout></host>
|
||||
// <host><name>host3</name><timeout>-1</timeout></host>
|
||||
// <host><name>host4, host5,host6</name><timeout>1800</timeout></host>
|
||||
Element doc = dom.getDocumentElement();
|
||||
NodeList nodes = doc.getElementsByTagName("host");
|
||||
for (int i = 0; i < nodes.getLength(); i++) {
|
||||
Node node = nodes.item(i);
|
||||
if (node.getNodeType() == Node.ELEMENT_NODE) {
|
||||
Element e= (Element) node;
|
||||
// Support both single host and comma-separated list of hosts.
|
||||
String v = readFirstTagValue(e, "name");
|
||||
String[] hosts = StringUtils.getTrimmedStrings(v);
|
||||
String str = readFirstTagValue(e, "timeout");
|
||||
Integer timeout = (str == null)? null : Integer.parseInt(str);
|
||||
for (String host : hosts) {
|
||||
map.put(host, timeout);
|
||||
LOG.info("Adding a node \"" + host + "\" to the list of "
|
||||
+ type + " hosts from " + filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException|SAXException|ParserConfigurationException e) {
|
||||
LOG.error("error parsing " + filename, e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
fileInputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
static String readFirstTagValue(Element e, String tag) {
|
||||
NodeList nodes = e.getElementsByTagName(tag);
|
||||
return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent();
|
||||
}
|
||||
|
||||
public void refresh(String includesFile, String excludesFile)
|
||||
throws IOException {
|
||||
LOG.info("Refreshing hosts (include/exclude) list");
|
||||
HostDetails oldDetails = current.get();
|
||||
Set<String> newIncludes = oldDetails.includes;
|
||||
Map<String, Integer> newExcludes = oldDetails.excludes;
|
||||
Set<String> newExcludes = oldDetails.excludes;
|
||||
if (includesFile != null && !includesFile.isEmpty()) {
|
||||
newIncludes = new HashSet<>();
|
||||
readFileToSet("included", includesFile, newIncludes);
|
||||
newIncludes = Collections.unmodifiableSet(newIncludes);
|
||||
}
|
||||
if (excludesFile != null && !excludesFile.isEmpty()) {
|
||||
newExcludes = new HashMap<>();
|
||||
readFileToMap("excluded", excludesFile, newExcludes);
|
||||
newExcludes = Collections.unmodifiableMap(newExcludes);
|
||||
newExcludes = new HashSet<>();
|
||||
readFileToSet("excluded", excludesFile, newExcludes);
|
||||
newExcludes = Collections.unmodifiableSet(newExcludes);
|
||||
}
|
||||
HostDetails newDetails = new HostDetails(includesFile, newIncludes,
|
||||
excludesFile, newExcludes);
|
||||
|
@ -211,7 +138,7 @@ public class HostsFileReader {
|
|||
LOG.info("Refreshing hosts (include/exclude) list");
|
||||
HostDetails oldDetails = current.get();
|
||||
Set<String> newIncludes = oldDetails.includes;
|
||||
Map<String, Integer> newExcludes = oldDetails.excludes;
|
||||
Set<String> newExcludes = oldDetails.excludes;
|
||||
if (inFileInputStream != null) {
|
||||
newIncludes = new HashSet<>();
|
||||
readFileToSetWithFileInputStream("included", oldDetails.includesFile,
|
||||
|
@ -219,10 +146,10 @@ public class HostsFileReader {
|
|||
newIncludes = Collections.unmodifiableSet(newIncludes);
|
||||
}
|
||||
if (exFileInputStream != null) {
|
||||
newExcludes = new HashMap<>();
|
||||
readFileToMapWithFileInputStream("excluded", oldDetails.excludesFile,
|
||||
newExcludes = new HashSet<>();
|
||||
readFileToSetWithFileInputStream("excluded", oldDetails.excludesFile,
|
||||
exFileInputStream, newExcludes);
|
||||
newExcludes = Collections.unmodifiableMap(newExcludes);
|
||||
newExcludes = Collections.unmodifiableSet(newExcludes);
|
||||
}
|
||||
HostDetails newDetails = new HostDetails(
|
||||
oldDetails.includesFile, newIncludes,
|
||||
|
@ -254,21 +181,6 @@ public class HostsFileReader {
|
|||
excludes.addAll(hostDetails.getExcludedHosts());
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve an atomic view of the included and excluded hosts.
|
||||
*
|
||||
* @param includeHosts set to populate with included hosts
|
||||
* @param excludeHosts map to populate with excluded hosts
|
||||
* @deprecated use {@link #getHostDetails() instead}
|
||||
*/
|
||||
@Deprecated
|
||||
public void getHostDetails(Set<String> includeHosts,
|
||||
Map<String, Integer> excludeHosts) {
|
||||
HostDetails hostDetails = current.get();
|
||||
includeHosts.addAll(hostDetails.getIncludedHosts());
|
||||
excludeHosts.putAll(hostDetails.getExcludedMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve an atomic view of the included and excluded hosts.
|
||||
*
|
||||
|
@ -311,12 +223,10 @@ public class HostsFileReader {
|
|||
private final String includesFile;
|
||||
private final Set<String> includes;
|
||||
private final String excludesFile;
|
||||
// exclude host list with optional timeout.
|
||||
// If the value is null, it indicates default timeout.
|
||||
private final Map<String, Integer> excludes;
|
||||
private final Set<String> excludes;
|
||||
|
||||
HostDetails(String includesFile, Set<String> includes,
|
||||
String excludesFile, Map<String, Integer> excludes) {
|
||||
String excludesFile, Set<String> excludes) {
|
||||
this.includesFile = includesFile;
|
||||
this.includes = includes;
|
||||
this.excludesFile = excludesFile;
|
||||
|
@ -336,10 +246,6 @@ public class HostsFileReader {
|
|||
}
|
||||
|
||||
public Set<String> getExcludedHosts() {
|
||||
return excludes.keySet();
|
||||
}
|
||||
|
||||
public Map<String, Integer> getExcludedMap() {
|
||||
return excludes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.util;
|
|||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileWriter;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.HostsFileReader.HostDetails;
|
||||
|
@ -290,61 +289,4 @@ public class TestHostsFileReader {
|
|||
assertFalse(hfp.getExcludedHosts().contains("somehost5"));
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Test if timeout values are provided in HostFile
|
||||
*/
|
||||
@Test
|
||||
public void testHostFileReaderWithTimeout() throws Exception {
|
||||
FileWriter efw = new FileWriter(excludesXmlFile);
|
||||
FileWriter ifw = new FileWriter(includesFile);
|
||||
|
||||
efw.write("<?xml version=\"1.0\"?>\n");
|
||||
efw.write("<!-- yarn.nodes.exclude -->\n");
|
||||
efw.write("<hosts>\n");
|
||||
efw.write("<host><name>host1</name></host>\n");
|
||||
efw.write("<host><name>host2</name><timeout>123</timeout></host>\n");
|
||||
efw.write("<host><name>host3</name><timeout>-1</timeout></host>\n");
|
||||
efw.write("<host><name>10000</name></host>\n");
|
||||
efw.write("<host><name>10001</name><timeout>123</timeout></host>\n");
|
||||
efw.write("<host><name>10002</name><timeout>-1</timeout></host>\n");
|
||||
efw.write("<host><name>host4,host5, host6</name>" +
|
||||
"<timeout>1800</timeout></host>\n");
|
||||
efw.write("</hosts>\n");
|
||||
efw.close();
|
||||
|
||||
ifw.write("#Hosts-in-DFS\n");
|
||||
ifw.write(" \n");
|
||||
ifw.write(" somehost \t somehost2 \n somehost4");
|
||||
ifw.write(" somehost3 \t # somehost5");
|
||||
ifw.close();
|
||||
|
||||
HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile);
|
||||
|
||||
int includesLen = hfp.getHosts().size();
|
||||
int excludesLen = hfp.getExcludedHosts().size();
|
||||
assertEquals(4, includesLen);
|
||||
assertEquals(9, excludesLen);
|
||||
|
||||
HostDetails hostDetails = hfp.getHostDetails();
|
||||
Map<String, Integer> excludes = hostDetails.getExcludedMap();
|
||||
assertTrue(excludes.containsKey("host1"));
|
||||
assertTrue(excludes.containsKey("host2"));
|
||||
assertTrue(excludes.containsKey("host3"));
|
||||
assertTrue(excludes.containsKey("10000"));
|
||||
assertTrue(excludes.containsKey("10001"));
|
||||
assertTrue(excludes.containsKey("10002"));
|
||||
assertTrue(excludes.containsKey("host4"));
|
||||
assertTrue(excludes.containsKey("host5"));
|
||||
assertTrue(excludes.containsKey("host6"));
|
||||
assertTrue(excludes.get("host1") == null);
|
||||
assertTrue(excludes.get("host2") == 123);
|
||||
assertTrue(excludes.get("host3") == -1);
|
||||
assertTrue(excludes.get("10000") == null);
|
||||
assertTrue(excludes.get("10001") == 123);
|
||||
assertTrue(excludes.get("10002") == -1);
|
||||
assertTrue(excludes.get("host4") == 1800);
|
||||
assertTrue(excludes.get("host5") == 1800);
|
||||
assertTrue(excludes.get("host6") == 1800);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -261,14 +261,14 @@ public class NodesListManager extends CompositeService implements
|
|||
|
||||
HostDetails hostDetails = hostsReader.getHostDetails();
|
||||
Set<String> includes = hostDetails.getIncludedHosts();
|
||||
Map<String, Integer> excludes = hostDetails.getExcludedMap();
|
||||
Set<String> excludes = hostDetails.getExcludedHosts();
|
||||
|
||||
for (RMNode n : this.rmContext.getRMNodes().values()) {
|
||||
NodeState s = n.getState();
|
||||
// An invalid node (either due to explicit exclude or not include)
|
||||
// should be excluded.
|
||||
boolean isExcluded = !isValidNode(
|
||||
n.getHostName(), includes, excludes.keySet());
|
||||
n.getHostName(), includes, excludes);
|
||||
String nodeStr = "node " + n.getNodeID() + " with state " + s;
|
||||
if (!isExcluded) {
|
||||
// Note that no action is needed for DECOMMISSIONED node.
|
||||
|
@ -280,17 +280,14 @@ public class NodesListManager extends CompositeService implements
|
|||
} else {
|
||||
// exclude is true.
|
||||
if (graceful) {
|
||||
// Use per node timeout if exist otherwise the request timeout.
|
||||
Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
|
||||
excludes.get(n.getHostName()) : timeout;
|
||||
if (s != NodeState.DECOMMISSIONED &&
|
||||
s != NodeState.DECOMMISSIONING) {
|
||||
LOG.info("Gracefully decommission " + nodeStr);
|
||||
nodesToDecom.add(n);
|
||||
} else if (s == NodeState.DECOMMISSIONING &&
|
||||
!Objects.equals(n.getDecommissioningTimeout(),
|
||||
timeoutToUse)) {
|
||||
LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
|
||||
timeout)) {
|
||||
LOG.info("Update " + nodeStr + " timeout to be " + timeout);
|
||||
nodesToDecom.add(n);
|
||||
} else {
|
||||
LOG.info("No action for " + nodeStr);
|
||||
|
@ -313,9 +310,7 @@ public class NodesListManager extends CompositeService implements
|
|||
for (RMNode n : nodesToDecom) {
|
||||
RMNodeEvent e;
|
||||
if (graceful) {
|
||||
Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
|
||||
excludes.get(n.getHostName()) : timeout;
|
||||
e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse);
|
||||
e = new RMNodeDecommissioningEvent(n.getNodeID(), timeout);
|
||||
} else {
|
||||
RMNodeEventType eventType = isUntrackedNode(n.getHostName())?
|
||||
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
|
||||
|
|
Loading…
Reference in New Issue