YARN-5214. Fixed locking in DirectoryCollection to avoid hanging NMs when various code-paths hit slow disks. Contributed by Junping Du.

This commit is contained in:
Vinod Kumar Vavilapalli 2016-07-05 16:07:28 -07:00
parent 9560f252cf
commit ce9c006430
1 changed files with 144 additions and 75 deletions

View File

@ -28,12 +28,17 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -89,6 +94,10 @@ static List<String> concat(List<String> l1, List<String> l2) {
private List<String> errorDirs; private List<String> errorDirs;
private List<String> fullDirs; private List<String> fullDirs;
// read/write lock for accessing above directories.
private final ReadLock readLock;
private final WriteLock writeLock;
private int numFailures; private int numFailures;
private float diskUtilizationPercentageCutoffHigh; private float diskUtilizationPercentageCutoffHigh;
@ -163,9 +172,13 @@ public DirectoryCollection(String[] dirs,
float utilizationPercentageCutOffHigh, float utilizationPercentageCutOffHigh,
float utilizationPercentageCutOffLow, float utilizationPercentageCutOffLow,
long utilizationSpaceCutOff) { long utilizationSpaceCutOff) {
localDirs = new CopyOnWriteArrayList<String>(dirs); localDirs = new CopyOnWriteArrayList<>(dirs);
errorDirs = new CopyOnWriteArrayList<String>(); errorDirs = new CopyOnWriteArrayList<>();
fullDirs = new CopyOnWriteArrayList<String>(); fullDirs = new CopyOnWriteArrayList<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
diskUtilizationPercentageCutoffHigh = Math.max(0.0F, Math.min(100.0F, diskUtilizationPercentageCutoffHigh = Math.max(0.0F, Math.min(100.0F,
utilizationPercentageCutOffHigh)); utilizationPercentageCutOffHigh));
@ -174,17 +187,18 @@ public DirectoryCollection(String[] dirs,
diskUtilizationSpaceCutoff = diskUtilizationSpaceCutoff =
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
dirsChangeListeners = new HashSet<DirsChangeListener>(); dirsChangeListeners = Collections.newSetFromMap(
new ConcurrentHashMap<DirsChangeListener, Boolean>());
} }
synchronized void registerDirsChangeListener( void registerDirsChangeListener(
DirsChangeListener listener) { DirsChangeListener listener) {
if (dirsChangeListeners.add(listener)) { if (dirsChangeListeners.add(listener)) {
listener.onDirsChanged(); listener.onDirsChanged();
} }
} }
synchronized void deregisterDirsChangeListener( void deregisterDirsChangeListener(
DirsChangeListener listener) { DirsChangeListener listener) {
dirsChangeListeners.remove(listener); dirsChangeListeners.remove(listener);
} }
@ -192,31 +206,51 @@ synchronized void deregisterDirsChangeListener(
/** /**
* @return the current valid directories * @return the current valid directories
*/ */
synchronized List<String> getGoodDirs() { List<String> getGoodDirs() {
return Collections.unmodifiableList(localDirs); this.readLock.lock();
try {
return Collections.unmodifiableList(localDirs);
} finally {
this.readLock.unlock();
}
} }
/** /**
* @return the failed directories * @return the failed directories
*/ */
synchronized List<String> getFailedDirs() { List<String> getFailedDirs() {
return Collections.unmodifiableList( this.readLock.lock();
DirectoryCollection.concat(errorDirs, fullDirs)); try {
return Collections.unmodifiableList(
DirectoryCollection.concat(errorDirs, fullDirs));
} finally {
this.readLock.unlock();
}
} }
/** /**
* @return the directories that have used all disk space * @return the directories that have used all disk space
*/ */
synchronized List<String> getFullDirs() { List<String> getFullDirs() {
return fullDirs; this.readLock.lock();
try {
return fullDirs;
} finally {
this.readLock.unlock();
}
} }
/** /**
* @return total the number of directory failures seen till now * @return total the number of directory failures seen till now
*/ */
synchronized int getNumFailures() { int getNumFailures() {
return numFailures; this.readLock.lock();
try {
return numFailures;
}finally {
this.readLock.unlock();
}
} }
/** /**
@ -226,18 +260,30 @@ synchronized int getNumFailures() {
* @param perm absolute permissions to use for any directories created * @param perm absolute permissions to use for any directories created
* @return true if there were no errors, false if at least one error occurred * @return true if there were no errors, false if at least one error occurred
*/ */
synchronized boolean createNonExistentDirs(FileContext localFs, boolean createNonExistentDirs(FileContext localFs,
FsPermission perm) { FsPermission perm) {
boolean failed = false; boolean failed = false;
for (final String dir : localDirs) { List<String> localDirectories = null;
this.readLock.lock();
try {
localDirectories = new ArrayList<>(localDirs);
} finally {
this.readLock.unlock();
}
for (final String dir : localDirectories) {
try { try {
createDir(localFs, new Path(dir), perm); createDir(localFs, new Path(dir), perm);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to create directory " + dir + " error " + LOG.warn("Unable to create directory " + dir + " error " +
e.getMessage() + ", removing from the list of valid directories."); e.getMessage() + ", removing from the list of valid directories.");
localDirs.remove(dir); this.writeLock.lock();
errorDirs.add(dir); try {
numFailures++; localDirs.remove(dir);
errorDirs.add(dir);
numFailures++;
} finally {
this.writeLock.unlock();
}
failed = true; failed = true;
} }
} }
@ -252,74 +298,93 @@ synchronized boolean createNonExistentDirs(FileContext localFs,
* checking or a failed directory passes the disk check <em>false</em> * checking or a failed directory passes the disk check <em>false</em>
* otherwise. * otherwise.
*/ */
synchronized boolean checkDirs() { boolean checkDirs() {
boolean setChanged = false; boolean setChanged = false;
Set<String> preCheckGoodDirs = new HashSet<String>(localDirs); Set<String> preCheckGoodDirs = null;
Set<String> preCheckFullDirs = new HashSet<String>(fullDirs); Set<String> preCheckFullDirs = null;
Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs); Set<String> preCheckOtherErrorDirs = null;
List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); List<String> failedDirs = null;
List<String> allLocalDirs = List<String> allLocalDirs = null;
DirectoryCollection.concat(localDirs, failedDirs); this.readLock.lock();
try {
preCheckGoodDirs = new HashSet<String>(localDirs);
preCheckFullDirs = new HashSet<String>(fullDirs);
preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
allLocalDirs = DirectoryCollection.concat(localDirs, failedDirs);
} finally {
this.readLock.unlock();
}
// move testDirs out of any lock as it could wait for very long time in
// case of busy IO
Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs, Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs,
preCheckGoodDirs); preCheckGoodDirs);
localDirs.clear(); this.writeLock.lock();
errorDirs.clear(); try {
fullDirs.clear(); localDirs.clear();
errorDirs.clear();
fullDirs.clear();
for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
.entrySet()) { .entrySet()) {
String dir = entry.getKey(); String dir = entry.getKey();
DiskErrorInformation errorInformation = entry.getValue(); DiskErrorInformation errorInformation = entry.getValue();
switch (entry.getValue().cause) { switch (entry.getValue().cause) {
case DISK_FULL: case DISK_FULL:
fullDirs.add(entry.getKey()); fullDirs.add(entry.getKey());
break; break;
case OTHER: case OTHER:
errorDirs.add(entry.getKey()); errorDirs.add(entry.getKey());
break; break;
} default:
if (preCheckGoodDirs.contains(dir)) { LOG.warn(entry.getValue().cause + " is unknown for disk error.");
LOG.warn("Directory " + dir + " error, " + errorInformation.message break;
+ ", removing from list of valid directories"); }
setChanged = true; if (preCheckGoodDirs.contains(dir)) {
numFailures++; LOG.warn("Directory " + dir + " error, " + errorInformation.message
} + ", removing from list of valid directories");
}
for (String dir : allLocalDirs) {
if (!dirsFailedCheck.containsKey(dir)) {
localDirs.add(dir);
if (preCheckFullDirs.contains(dir)
|| preCheckOtherErrorDirs.contains(dir)) {
setChanged = true; setChanged = true;
LOG.info("Directory " + dir numFailures++;
+ " passed disk check, adding to list of valid directories.");
} }
} }
} for (String dir : allLocalDirs) {
Set<String> postCheckFullDirs = new HashSet<String>(fullDirs); if (!dirsFailedCheck.containsKey(dir)) {
Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs); localDirs.add(dir);
for (String dir : preCheckFullDirs) { if (preCheckFullDirs.contains(dir)
if (postCheckOtherDirs.contains(dir)) { || preCheckOtherErrorDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error " setChanged = true;
+ dirsFailedCheck.get(dir).message); LOG.info("Directory " + dir
+ " passed disk check, adding to list of valid directories.");
}
}
}
Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
for (String dir : preCheckFullDirs) {
if (postCheckOtherDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error "
+ dirsFailedCheck.get(dir).message);
}
} }
}
for (String dir : preCheckOtherErrorDirs) { for (String dir : preCheckOtherErrorDirs) {
if (postCheckFullDirs.contains(dir)) { if (postCheckFullDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error " LOG.warn("Directory " + dir + " error "
+ dirsFailedCheck.get(dir).message); + dirsFailedCheck.get(dir).message);
}
} }
} setGoodDirsDiskUtilizationPercentage();
setGoodDirsDiskUtilizationPercentage(); if (setChanged) {
if (setChanged) { for (DirsChangeListener listener : dirsChangeListeners) {
for (DirsChangeListener listener : dirsChangeListeners) { listener.onDirsChanged();
listener.onDirsChanged(); }
} }
return setChanged;
} finally {
this.writeLock.unlock();
} }
return setChanged;
} }
Map<String, DiskErrorInformation> testDirs(List<String> dirs, Map<String, DiskErrorInformation> testDirs(List<String> dirs,
@ -409,7 +474,11 @@ private void createDir(FileContext localFs, Path dir, FsPermission perm)
localFs.getFileStatus(dir); localFs.getFileStatus(dir);
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
createDir(localFs, dir.getParent(), perm); createDir(localFs, dir.getParent(), perm);
localFs.mkdir(dir, perm, false); try {
localFs.mkdir(dir, perm, false);
} catch (FileAlreadyExistsException ex) {
// do nothing as other threads could in creating the same directory.
}
if (!perm.equals(perm.applyUMask(localFs.getUMask()))) { if (!perm.equals(perm.applyUMask(localFs.getUMask()))) {
localFs.setPermission(dir, perm); localFs.setPermission(dir, perm);
} }