From 19eb997f675a843a2bfe21094ef524f8c5870919 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 7 Jun 2016 09:18:58 -0700 Subject: [PATCH] HADOOP-10048. LocalDirAllocator should avoid holding locks while accessing the filesystem. Contributed by Jason Lowe. (cherry picked from commit c14c1b298e29e799f7c8f15ff24d7eba6e0cd39b) --- .../apache/hadoop/fs/LocalDirAllocator.java | 153 +++++++++++------- 1 file changed, 94 insertions(+), 59 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 70cf87dc7a4..b14e1f06319 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -20,9 +20,10 @@ import java.io.*; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.*; - import org.apache.hadoop.util.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -247,74 +248,101 @@ private static class AllocatorPerContext { private final Log LOG = LogFactory.getLog(AllocatorPerContext.class); - private int dirNumLastAccessed; private Random dirIndexRandomizer = new Random(); - private FileSystem localFS; - private DF[] dirDF = new DF[0]; private String contextCfgItemName; - private String[] localDirs = new String[0]; - private String savedLocalDirs = ""; + + // NOTE: the context must be accessed via a local reference as it + // may be updated at any time to reference a different context + private AtomicReference currentContext; + + private static class Context { + private AtomicInteger dirNumLastAccessed = new AtomicInteger(0); + private FileSystem localFS; + private DF[] dirDF; + private Path[] localDirs; + private String savedLocalDirs; + + public int getAndIncrDirNumLastAccessed() { + return getAndIncrDirNumLastAccessed(1); + } + + public int getAndIncrDirNumLastAccessed(int delta) { + if (localDirs.length < 2 || delta == 0) { + return dirNumLastAccessed.get(); + } + int oldval, newval; + do { + oldval = dirNumLastAccessed.get(); + newval = (oldval + delta) % localDirs.length; + } while (!dirNumLastAccessed.compareAndSet(oldval, newval)); + return oldval; + } + } public AllocatorPerContext(String contextCfgItemName) { this.contextCfgItemName = contextCfgItemName; + this.currentContext = new AtomicReference(new Context()); } /** This method gets called everytime before any read/write to make sure * that any change to localDirs is reflected immediately. */ - private synchronized void confChanged(Configuration conf) + private Context confChanged(Configuration conf) throws IOException { + Context ctx = currentContext.get(); String newLocalDirs = conf.get(contextCfgItemName); if (null == newLocalDirs) { throw new IOException(contextCfgItemName + " not configured"); } - if (!newLocalDirs.equals(savedLocalDirs)) { - localDirs = StringUtils.getTrimmedStrings(newLocalDirs); - localFS = FileSystem.getLocal(conf); - int numDirs = localDirs.length; - ArrayList dirs = new ArrayList(numDirs); + if (!newLocalDirs.equals(ctx.savedLocalDirs)) { + ctx = new Context(); + String[] dirStrings = StringUtils.getTrimmedStrings(newLocalDirs); + ctx.localFS = FileSystem.getLocal(conf); + int numDirs = dirStrings.length; + ArrayList dirs = new ArrayList(numDirs); ArrayList dfList = new ArrayList(numDirs); for (int i = 0; i < numDirs; i++) { try { // filter problematic directories - Path tmpDir = new Path(localDirs[i]); - if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) { + Path tmpDir = new Path(dirStrings[i]); + if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) { try { - File tmpFile = tmpDir.isAbsolute() - ? new File(localFS.makeQualified(tmpDir).toUri()) - : new File(localDirs[i]); + ? new File(ctx.localFS.makeQualified(tmpDir).toUri()) + : new File(dirStrings[i]); DiskChecker.checkDir(tmpFile); - dirs.add(tmpFile.getPath()); + dirs.add(new Path(tmpFile.getPath())); dfList.add(new DF(tmpFile, 30000)); - } catch (DiskErrorException de) { - LOG.warn( localDirs[i] + " is not writable\n", de); + LOG.warn(dirStrings[i] + " is not writable\n", de); } } else { - LOG.warn( "Failed to create " + localDirs[i]); + LOG.warn("Failed to create " + dirStrings[i]); } } catch (IOException ie) { - LOG.warn( "Failed to create " + localDirs[i] + ": " + + LOG.warn("Failed to create " + dirStrings[i] + ": " + ie.getMessage() + "\n", ie); } //ignore } - localDirs = dirs.toArray(new String[dirs.size()]); - dirDF = dfList.toArray(new DF[dirs.size()]); - savedLocalDirs = newLocalDirs; - + ctx.localDirs = dirs.toArray(new Path[dirs.size()]); + ctx.dirDF = dfList.toArray(new DF[dirs.size()]); + ctx.savedLocalDirs = newLocalDirs; + if (dirs.size() > 0) { // randomize the first disk picked in the round-robin selection - dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size()); + ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size())); } + + currentContext.set(ctx); } + + return ctx; } - private Path createPath(String path, + private Path createPath(Path dir, String path, boolean checkWrite) throws IOException { - Path file = new Path(new Path(localDirs[dirNumLastAccessed]), - path); + Path file = new Path(dir, path); if (checkWrite) { //check whether we are able to create a directory here. If the disk //happens to be RDONLY we will fail @@ -334,7 +362,7 @@ private Path createPath(String path, * @return the current directory index. */ int getCurrentDirectoryIndex() { - return dirNumLastAccessed; + return currentContext.get().dirNumLastAccessed.get(); } /** Get a path from the local FS. If size is known, we go @@ -344,10 +372,10 @@ int getCurrentDirectoryIndex() { * If size is not known, use roulette selection -- pick directories * with probability proportional to their available space. */ - public synchronized Path getLocalPathForWrite(String pathStr, long size, + public Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite) throws IOException { - confChanged(conf); - int numDirs = localDirs.length; + Context ctx = confChanged(conf); + int numDirs = ctx.localDirs.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri //resolution results in a valid path on the dir being checked) @@ -358,12 +386,12 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size, if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability //proportional to available size - long[] availableOnDisk = new long[dirDF.length]; + long[] availableOnDisk = new long[ctx.dirDF.length]; long totalAvailable = 0; //build the "roulette wheel" - for(int i =0; i < dirDF.length; ++i) { - availableOnDisk[i] = dirDF[i].getAvailable(); + for(int i =0; i < ctx.dirDF.length; ++i) { + availableOnDisk[i] = ctx.dirDF[i].getAvailable(); totalAvailable += availableOnDisk[i]; } @@ -380,8 +408,8 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size, randomPosition -= availableOnDisk[dir]; dir++; } - dirNumLastAccessed = dir; - returnPath = createPath(pathStr, checkWrite); + ctx.dirNumLastAccessed.set(dir); + returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk @@ -389,15 +417,21 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size, } } } else { - while (numDirsSearched < numDirs && returnPath == null) { - long capacity = dirDF[dirNumLastAccessed].getAvailable(); + int dirNum = ctx.getAndIncrDirNumLastAccessed(); + while (numDirsSearched < numDirs) { + long capacity = ctx.dirDF[dirNum].getAvailable(); if (capacity > size) { - returnPath = createPath(pathStr, checkWrite); + returnPath = + createPath(ctx.localDirs[dirNum], pathStr, checkWrite); + if (returnPath != null) { + ctx.getAndIncrDirNumLastAccessed(numDirsSearched); + break; + } } - dirNumLastAccessed++; - dirNumLastAccessed = dirNumLastAccessed % numDirs; + dirNum++; + dirNum = dirNum % numDirs; numDirsSearched++; - } + } } if (returnPath != null) { return returnPath; @@ -432,10 +466,10 @@ public File createTmpFileForWrite(String pathStr, long size, * configured dirs for the file's existence and return the complete * path to the file when we find one */ - public synchronized Path getLocalPathToRead(String pathStr, + public Path getLocalPathToRead(String pathStr, Configuration conf) throws IOException { - confChanged(conf); - int numDirs = localDirs.length; + Context ctx = confChanged(conf); + int numDirs = ctx.localDirs.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri //resolution results in a valid path on the dir being checked) @@ -443,8 +477,8 @@ public synchronized Path getLocalPathToRead(String pathStr, pathStr = pathStr.substring(1); } while (numDirsSearched < numDirs) { - Path file = new Path(localDirs[numDirsSearched], pathStr); - if (localFS.exists(file)) { + Path file = new Path(ctx.localDirs[numDirsSearched], pathStr); + if (ctx.localFS.exists(file)) { return file; } numDirsSearched++; @@ -459,10 +493,10 @@ private static class PathIterator implements Iterator, Iterable { private final FileSystem fs; private final String pathStr; private int i = 0; - private final String[] rootDirs; + private final Path[] rootDirs; private Path next = null; - private PathIterator(FileSystem fs, String pathStr, String[] rootDirs) + private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs) throws IOException { this.fs = fs; this.pathStr = pathStr; @@ -517,21 +551,22 @@ public Iterator iterator() { * @return all of the paths that exist under any of the roots * @throws IOException */ - synchronized Iterable getAllLocalPathsToRead(String pathStr, + Iterable getAllLocalPathsToRead(String pathStr, Configuration conf) throws IOException { - confChanged(conf); + Context ctx = confChanged(conf); if (pathStr.startsWith("/")) { pathStr = pathStr.substring(1); } - return new PathIterator(localFS, pathStr, localDirs); + return new PathIterator(ctx.localFS, pathStr, ctx.localDirs); } /** We search through all the configured dirs for the file's existence * and return true when we find one */ - public synchronized boolean ifExists(String pathStr,Configuration conf) { + public boolean ifExists(String pathStr, Configuration conf) { + Context ctx = currentContext.get(); try { - int numDirs = localDirs.length; + int numDirs = ctx.localDirs.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri //resolution results in a valid path on the dir being checked) @@ -539,8 +574,8 @@ public synchronized boolean ifExists(String pathStr,Configuration conf) { pathStr = pathStr.substring(1); } while (numDirsSearched < numDirs) { - Path file = new Path(localDirs[numDirsSearched], pathStr); - if (localFS.exists(file)) { + Path file = new Path(ctx.localDirs[numDirsSearched], pathStr); + if (ctx.localFS.exists(file)) { return true; } numDirsSearched++;