diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index a2bfd206c80..417b2073650 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -28,12 +28,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -89,6 +94,10 @@ public class DirectoryCollection { private List errorDirs; private List fullDirs; + // read/write lock for accessing above directories. + private final ReadLock readLock; + private final WriteLock writeLock; + private int numFailures; private float diskUtilizationPercentageCutoffHigh; @@ -163,9 +172,13 @@ public class DirectoryCollection { float utilizationPercentageCutOffHigh, float utilizationPercentageCutOffLow, long utilizationSpaceCutOff) { - localDirs = new CopyOnWriteArrayList(dirs); - errorDirs = new CopyOnWriteArrayList(); - fullDirs = new CopyOnWriteArrayList(); + localDirs = new CopyOnWriteArrayList<>(dirs); + errorDirs = new CopyOnWriteArrayList<>(); + fullDirs = new CopyOnWriteArrayList<>(); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); diskUtilizationPercentageCutoffHigh = Math.max(0.0F, Math.min(100.0F, utilizationPercentageCutOffHigh)); @@ -174,17 +187,18 @@ public class DirectoryCollection { diskUtilizationSpaceCutoff = utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; - dirsChangeListeners = new HashSet(); + dirsChangeListeners = Collections.newSetFromMap( + new ConcurrentHashMap()); } - synchronized void registerDirsChangeListener( + void registerDirsChangeListener( DirsChangeListener listener) { if (dirsChangeListeners.add(listener)) { listener.onDirsChanged(); } } - synchronized void deregisterDirsChangeListener( + void deregisterDirsChangeListener( DirsChangeListener listener) { dirsChangeListeners.remove(listener); } @@ -192,31 +206,51 @@ public class DirectoryCollection { /** * @return the current valid directories */ - synchronized List getGoodDirs() { - return Collections.unmodifiableList(localDirs); + List getGoodDirs() { + this.readLock.lock(); + try { + return Collections.unmodifiableList(localDirs); + } finally { + this.readLock.unlock(); + } } /** * @return the failed directories */ - synchronized List getFailedDirs() { - return Collections.unmodifiableList( - DirectoryCollection.concat(errorDirs, fullDirs)); + List getFailedDirs() { + this.readLock.lock(); + try { + return Collections.unmodifiableList( + DirectoryCollection.concat(errorDirs, fullDirs)); + } finally { + this.readLock.unlock(); + } } /** * @return the directories that have used all disk space */ - synchronized List getFullDirs() { - return fullDirs; + List getFullDirs() { + this.readLock.lock(); + try { + return fullDirs; + } finally { + this.readLock.unlock(); + } } /** * @return total the number of directory failures seen till now */ - synchronized int getNumFailures() { - return numFailures; + int getNumFailures() { + this.readLock.lock(); + try { + return numFailures; + }finally { + this.readLock.unlock(); + } } /** @@ -226,18 +260,30 @@ public class DirectoryCollection { * @param perm absolute permissions to use for any directories created * @return true if there were no errors, false if at least one error occurred */ - synchronized boolean createNonExistentDirs(FileContext localFs, + boolean createNonExistentDirs(FileContext localFs, FsPermission perm) { boolean failed = false; - for (final String dir : localDirs) { + List localDirectories = null; + this.readLock.lock(); + try { + localDirectories = new ArrayList<>(localDirs); + } finally { + this.readLock.unlock(); + } + for (final String dir : localDirectories) { try { createDir(localFs, new Path(dir), perm); } catch (IOException e) { LOG.warn("Unable to create directory " + dir + " error " + e.getMessage() + ", removing from the list of valid directories."); - localDirs.remove(dir); - errorDirs.add(dir); - numFailures++; + this.writeLock.lock(); + try { + localDirs.remove(dir); + errorDirs.add(dir); + numFailures++; + } finally { + this.writeLock.unlock(); + } failed = true; } } @@ -252,74 +298,93 @@ public class DirectoryCollection { * checking or a failed directory passes the disk check false * otherwise. */ - synchronized boolean checkDirs() { + boolean checkDirs() { boolean setChanged = false; - Set preCheckGoodDirs = new HashSet(localDirs); - Set preCheckFullDirs = new HashSet(fullDirs); - Set preCheckOtherErrorDirs = new HashSet(errorDirs); - List failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); - List allLocalDirs = - DirectoryCollection.concat(localDirs, failedDirs); + Set preCheckGoodDirs = null; + Set preCheckFullDirs = null; + Set preCheckOtherErrorDirs = null; + List failedDirs = null; + List allLocalDirs = null; + this.readLock.lock(); + try { + preCheckGoodDirs = new HashSet(localDirs); + preCheckFullDirs = new HashSet(fullDirs); + preCheckOtherErrorDirs = new HashSet(errorDirs); + failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); + allLocalDirs = DirectoryCollection.concat(localDirs, failedDirs); + } finally { + this.readLock.unlock(); + } + // move testDirs out of any lock as it could wait for very long time in + // case of busy IO Map dirsFailedCheck = testDirs(allLocalDirs, preCheckGoodDirs); - localDirs.clear(); - errorDirs.clear(); - fullDirs.clear(); + this.writeLock.lock(); + try { + localDirs.clear(); + errorDirs.clear(); + fullDirs.clear(); - for (Map.Entry entry : dirsFailedCheck - .entrySet()) { - String dir = entry.getKey(); - DiskErrorInformation errorInformation = entry.getValue(); - switch (entry.getValue().cause) { - case DISK_FULL: - fullDirs.add(entry.getKey()); - break; - case OTHER: - errorDirs.add(entry.getKey()); - break; - } - if (preCheckGoodDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error, " + errorInformation.message - + ", removing from list of valid directories"); - setChanged = true; - numFailures++; - } - } - for (String dir : allLocalDirs) { - if (!dirsFailedCheck.containsKey(dir)) { - localDirs.add(dir); - if (preCheckFullDirs.contains(dir) - || preCheckOtherErrorDirs.contains(dir)) { + for (Map.Entry entry : dirsFailedCheck + .entrySet()) { + String dir = entry.getKey(); + DiskErrorInformation errorInformation = entry.getValue(); + switch (entry.getValue().cause) { + case DISK_FULL: + fullDirs.add(entry.getKey()); + break; + case OTHER: + errorDirs.add(entry.getKey()); + break; + default: + LOG.warn(entry.getValue().cause + " is unknown for disk error."); + break; + } + if (preCheckGoodDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error, " + errorInformation.message + + ", removing from list of valid directories"); setChanged = true; - LOG.info("Directory " + dir - + " passed disk check, adding to list of valid directories."); + numFailures++; } } - } - Set postCheckFullDirs = new HashSet(fullDirs); - Set postCheckOtherDirs = new HashSet(errorDirs); - for (String dir : preCheckFullDirs) { - if (postCheckOtherDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); + for (String dir : allLocalDirs) { + if (!dirsFailedCheck.containsKey(dir)) { + localDirs.add(dir); + if (preCheckFullDirs.contains(dir) + || preCheckOtherErrorDirs.contains(dir)) { + setChanged = true; + LOG.info("Directory " + dir + + " passed disk check, adding to list of valid directories."); + } + } + } + Set postCheckFullDirs = new HashSet(fullDirs); + Set postCheckOtherDirs = new HashSet(errorDirs); + for (String dir : preCheckFullDirs) { + if (postCheckOtherDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } } - } - for (String dir : preCheckOtherErrorDirs) { - if (postCheckFullDirs.contains(dir)) { - LOG.warn("Directory " + dir + " error " - + dirsFailedCheck.get(dir).message); + for (String dir : preCheckOtherErrorDirs) { + if (postCheckFullDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } } - } - setGoodDirsDiskUtilizationPercentage(); - if (setChanged) { - for (DirsChangeListener listener : dirsChangeListeners) { - listener.onDirsChanged(); + setGoodDirsDiskUtilizationPercentage(); + if (setChanged) { + for (DirsChangeListener listener : dirsChangeListeners) { + listener.onDirsChanged(); + } } + return setChanged; + } finally { + this.writeLock.unlock(); } - return setChanged; } Map testDirs(List dirs, @@ -409,7 +474,11 @@ public class DirectoryCollection { localFs.getFileStatus(dir); } catch (FileNotFoundException e) { createDir(localFs, dir.getParent(), perm); - localFs.mkdir(dir, perm, false); + try { + localFs.mkdir(dir, perm, false); + } catch (FileAlreadyExistsException ex) { + // do nothing as other threads could in creating the same directory. + } if (!perm.equals(perm.applyUMask(localFs.getUMask()))) { localFs.setPermission(dir, perm); }