HADOOP-14412. HostsFileReader#getHostDetails is very expensive on large clusters. Contributed by Jason Lowe.

This commit is contained in:
Rohith Sharma K S 2017-05-17 08:26:17 +05:30
parent 95e8f4c80d
commit c61956dd1e
4 changed files with 139 additions and 136 deletions

View File

@ -20,11 +20,10 @@ package org.apache.hadoop.util;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@ -37,37 +36,26 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class HostsFileReader {
private Set<String> includes;
private Set<String> excludes;
private String includesFile;
private String excludesFile;
private WriteLock writeLock;
private ReadLock readLock;
private static final Log LOG = LogFactory.getLog(HostsFileReader.class);
public HostsFileReader(String inFile,
private final AtomicReference<HostDetails> current;
public HostsFileReader(String inFile,
String exFile) throws IOException {
includes = new HashSet<String>();
excludes = new HashSet<String>();
includesFile = inFile;
excludesFile = exFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.writeLock = rwLock.writeLock();
this.readLock = rwLock.readLock();
refresh();
HostDetails hostDetails = new HostDetails(
inFile, Collections.<String>emptySet(),
exFile, Collections.<String>emptySet());
current = new AtomicReference<>(hostDetails);
refresh(inFile, exFile);
}
@Private
public HostsFileReader(String includesFile, InputStream inFileInputStream,
String excludesFile, InputStream exFileInputStream) throws IOException {
includes = new HashSet<String>();
excludes = new HashSet<String>();
this.includesFile = includesFile;
this.excludesFile = excludesFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.writeLock = rwLock.writeLock();
this.readLock = rwLock.readLock();
HostDetails hostDetails = new HostDetails(
includesFile, Collections.<String>emptySet(),
excludesFile, Collections.<String>emptySet());
current = new AtomicReference<>(hostDetails);
refresh(inFileInputStream, exFileInputStream);
}
@ -113,125 +101,146 @@ public class HostsFileReader {
}
public void refresh() throws IOException {
this.writeLock.lock();
try {
refresh(includesFile, excludesFile);
} finally {
this.writeLock.unlock();
}
HostDetails hostDetails = current.get();
refresh(hostDetails.includesFile, hostDetails.excludesFile);
}
public void refresh(String includeFiles, String excludeFiles)
public void refresh(String includesFile, String excludesFile)
throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
this.writeLock.lock();
try {
// update instance variables
updateFileNames(includeFiles, excludeFiles);
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (includeFiles != null && !includeFiles.isEmpty()) {
readFileToSet("included", includeFiles, newIncludes);
switchIncludes = true;
}
if (excludeFiles != null && !excludeFiles.isEmpty()) {
readFileToSet("excluded", excludeFiles, newExcludes);
switchExcludes = true;
}
if (switchIncludes) {
// switch the new hosts that are to be included
includes = newIncludes;
}
if (switchExcludes) {
// switch the excluded hosts
excludes = newExcludes;
}
} finally {
this.writeLock.unlock();
HostDetails oldDetails = current.get();
Set<String> newIncludes = oldDetails.includes;
Set<String> newExcludes = oldDetails.excludes;
if (includesFile != null && !includesFile.isEmpty()) {
newIncludes = new HashSet<>();
readFileToSet("included", includesFile, newIncludes);
newIncludes = Collections.unmodifiableSet(newIncludes);
}
if (excludesFile != null && !excludesFile.isEmpty()) {
newExcludes = new HashSet<>();
readFileToSet("excluded", excludesFile, newExcludes);
newExcludes = Collections.unmodifiableSet(newExcludes);
}
HostDetails newDetails = new HostDetails(includesFile, newIncludes,
excludesFile, newExcludes);
current.set(newDetails);
}
@Private
public void refresh(InputStream inFileInputStream,
InputStream exFileInputStream) throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
this.writeLock.lock();
try {
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (inFileInputStream != null) {
readFileToSetWithFileInputStream("included", includesFile,
inFileInputStream, newIncludes);
switchIncludes = true;
}
if (exFileInputStream != null) {
readFileToSetWithFileInputStream("excluded", excludesFile,
exFileInputStream, newExcludes);
switchExcludes = true;
}
if (switchIncludes) {
// switch the new hosts that are to be included
includes = newIncludes;
}
if (switchExcludes) {
// switch the excluded hosts
excludes = newExcludes;
}
} finally {
this.writeLock.unlock();
HostDetails oldDetails = current.get();
Set<String> newIncludes = oldDetails.includes;
Set<String> newExcludes = oldDetails.excludes;
if (inFileInputStream != null) {
newIncludes = new HashSet<>();
readFileToSetWithFileInputStream("included", oldDetails.includesFile,
inFileInputStream, newIncludes);
newIncludes = Collections.unmodifiableSet(newIncludes);
}
if (exFileInputStream != null) {
newExcludes = new HashSet<>();
readFileToSetWithFileInputStream("excluded", oldDetails.excludesFile,
exFileInputStream, newExcludes);
newExcludes = Collections.unmodifiableSet(newExcludes);
}
HostDetails newDetails = new HostDetails(
oldDetails.includesFile, newIncludes,
oldDetails.excludesFile, newExcludes);
current.set(newDetails);
}
public Set<String> getHosts() {
this.readLock.lock();
try {
return includes;
} finally {
this.readLock.unlock();
}
HostDetails hostDetails = current.get();
return hostDetails.getIncludedHosts();
}
public Set<String> getExcludedHosts() {
this.readLock.lock();
try {
return excludes;
} finally {
this.readLock.unlock();
}
HostDetails hostDetails = current.get();
return hostDetails.getExcludedHosts();
}
/**
* Retrieve an atomic view of the included and excluded hosts.
*
* @param includes set to populate with included hosts
* @param excludes set to populate with excluded hosts
* @deprecated use {@link #getHostDetails() instead}
*/
@Deprecated
public void getHostDetails(Set<String> includes, Set<String> excludes) {
this.readLock.lock();
try {
includes.addAll(this.includes);
excludes.addAll(this.excludes);
} finally {
this.readLock.unlock();
}
HostDetails hostDetails = current.get();
includes.addAll(hostDetails.getIncludedHosts());
excludes.addAll(hostDetails.getExcludedHosts());
}
/**
* Retrieve an atomic view of the included and excluded hosts.
*
* @return the included and excluded hosts
*/
public HostDetails getHostDetails() {
return current.get();
}
public void setIncludesFile(String includesFile) {
LOG.info("Setting the includes file to " + includesFile);
this.includesFile = includesFile;
HostDetails oldDetails = current.get();
HostDetails newDetails = new HostDetails(includesFile, oldDetails.includes,
oldDetails.excludesFile, oldDetails.excludes);
current.set(newDetails);
}
public void setExcludesFile(String excludesFile) {
LOG.info("Setting the excludes file to " + excludesFile);
this.excludesFile = excludesFile;
HostDetails oldDetails = current.get();
HostDetails newDetails = new HostDetails(
oldDetails.includesFile, oldDetails.includes,
excludesFile, oldDetails.excludes);
current.set(newDetails);
}
public void updateFileNames(String includeFiles, String excludeFiles) {
this.writeLock.lock();
try {
setIncludesFile(includeFiles);
setExcludesFile(excludeFiles);
} finally {
this.writeLock.unlock();
public void updateFileNames(String includesFile, String excludesFile) {
LOG.info("Setting the includes file to " + includesFile);
LOG.info("Setting the excludes file to " + excludesFile);
HostDetails oldDetails = current.get();
HostDetails newDetails = new HostDetails(includesFile, oldDetails.includes,
excludesFile, oldDetails.excludes);
current.set(newDetails);
}
/**
* An atomic view of the included and excluded hosts.
*/
public static class HostDetails {
private final String includesFile;
private final Set<String> includes;
private final String excludesFile;
private final Set<String> excludes;
HostDetails(String includesFile, Set<String> includes,
String excludesFile, Set<String> excludes) {
this.includesFile = includesFile;
this.includes = includes;
this.excludesFile = excludesFile;
this.excludes = excludes;
}
public String getIncludesFile() {
return includesFile;
}
public Set<String> getIncludedHosts() {
return includes;
}
public String getExcludesFile() {
return excludesFile;
}
public Set<String> getExcludedHosts() {
return excludes;
}
}
}

View File

@ -20,10 +20,9 @@ package org.apache.hadoop.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.HostsFileReader.HostDetails;
import org.junit.*;
import static org.junit.Assert.*;
@ -117,11 +116,11 @@ public class TestHostsFileReader {
assertTrue(hfp.getExcludedHosts().contains("node1"));
assertTrue(hfp.getHosts().contains("node2"));
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hfp.getHostDetails(hostsList, excludeList);
assertTrue(excludeList.contains("node1"));
assertTrue(hostsList.contains("node2"));
HostDetails hostDetails = hfp.getHostDetails();
assertTrue(hostDetails.getExcludedHosts().contains("node1"));
assertTrue(hostDetails.getIncludedHosts().contains("node2"));
assertEquals(newIncludesFile, hostDetails.getIncludesFile());
assertEquals(newExcludesFile, hostDetails.getExcludesFile());
}
/*

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.HostsFileReader.HostDetails;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -184,14 +185,11 @@ public class NodesListManager extends CompositeService implements
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
for (String include : hostsList) {
HostDetails hostDetails = hostsReader.getHostDetails();
for (String include : hostDetails.getIncludedHosts()) {
LOG.debug("include: " + include);
}
for (String exclude : excludeList) {
for (String exclude : hostDetails.getExcludedHosts()) {
LOG.debug("exclude: " + exclude);
}
}
@ -362,9 +360,9 @@ public class NodesListManager extends CompositeService implements
public boolean isValidNode(String hostName) {
String ip = resolver.resolve(hostName);
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
HostDetails hostDetails = hostsReader.getHostDetails();
Set<String> hostsList = hostDetails.getIncludedHosts();
Set<String> excludeList = hostDetails.getExcludedHosts();
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
@ -466,10 +464,9 @@ public class NodesListManager extends CompositeService implements
public boolean isUntrackedNode(String hostName) {
String ip = resolver.resolve(hostName);
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
HostDetails hostDetails = hostsReader.getHostDetails();
Set<String> hostsList = hostDetails.getIncludedHosts();
Set<String> excludeList = hostDetails.getExcludedHosts();
return !hostsList.isEmpty() && !hostsList.contains(hostName)
&& !hostsList.contains(ip) && !excludeList.contains(hostName)

View File

@ -205,8 +205,6 @@ public class TestNMReconnect {
nm1.registerNode();
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.getRMContext().getNodesListManager().getHostsReader().
getExcludedHosts().add("127.0.0.1");
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(),
RMNodeEventType.GRACEFUL_DECOMMISSION));