diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c0b1c0ce5dc..253953a5d2e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -358,6 +358,9 @@ Release 0.23.0 - Unreleased HADOOP-7552. FileUtil#fullyDelete doesn't throw IOE but lists it in the throws clause. (eli) + HADOOP-7580. Add a version of getLocalPathForWrite to LocalDirAllocator + which doesn't create dirs. (Chris Douglas & Siddharth Seth via acmurthy) + OPTIMIZATIONS HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 3753b2b9a3f..71c82357577 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -128,8 +128,26 @@ public class LocalDirAllocator { */ public Path getLocalPathForWrite(String pathStr, long size, Configuration conf) throws IOException { + return getLocalPathForWrite(pathStr, size, conf, true); + } + + /** Get a path from the local FS. Pass size as + * SIZE_UNKNOWN if not known apriori. We + * round-robin over the set of disks (via the configured dirs) and return + * the first complete path which has enough space + * @param pathStr the requested path (this will be created on the first + * available disk) + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @param checkWrite ensure that the path is writable + * @return the complete path to the file on a local disk + * @throws IOException + */ + public Path getLocalPathForWrite(String pathStr, long size, + Configuration conf, + boolean checkWrite) throws IOException { AllocatorPerContext context = obtainContext(contextCfgItemName); - return context.getLocalPathForWrite(pathStr, size, conf); + return context.getLocalPathForWrite(pathStr, size, conf, checkWrite); } /** Get a path from the local FS for reading. We search through all the @@ -145,6 +163,23 @@ public class LocalDirAllocator { AllocatorPerContext context = obtainContext(contextCfgItemName); return context.getLocalPathToRead(pathStr, conf); } + + /** + * Get all of the paths that currently exist in the working directories. + * @param pathStr the path underneath the roots + * @param conf the configuration to look up the roots in + * @return all of the paths that exist under any of the roots + * @throws IOException + */ + public Iterable getAllLocalPathsToRead(String pathStr, + Configuration conf + ) throws IOException { + AllocatorPerContext context; + synchronized (this) { + context = obtainContext(contextCfgItemName); + } + return context.getAllLocalPathsToRead(pathStr, conf); + } /** Creates a temporary file in the local FS. Pass size as -1 if not known * apriori. We round-robin over the set of disks (via the configured dirs) @@ -214,7 +249,8 @@ public class LocalDirAllocator { /** This method gets called everytime before any read/write to make sure * that any change to localDirs is reflected immediately. */ - private void confChanged(Configuration conf) throws IOException { + private synchronized void confChanged(Configuration conf) + throws IOException { String newLocalDirs = conf.get(contextCfgItemName); if (!newLocalDirs.equals(savedLocalDirs)) { localDirs = conf.getTrimmedStrings(contextCfgItemName); @@ -251,18 +287,22 @@ public class LocalDirAllocator { } } - private Path createPath(String path) throws IOException { + private Path createPath(String path, + boolean checkWrite) throws IOException { Path file = new Path(new Path(localDirs[dirNumLastAccessed]), path); - //check whether we are able to create a directory here. If the disk - //happens to be RDONLY we will fail - try { - DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); - return file; - } catch (DiskErrorException d) { - LOG.warn("Disk Error Exception: ", d); - return null; + if (checkWrite) { + //check whether we are able to create a directory here. If the disk + //happens to be RDONLY we will fail + try { + DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); + return file; + } catch (DiskErrorException d) { + LOG.warn("Disk Error Exception: ", d); + return null; + } } + return file; } /** @@ -272,17 +312,6 @@ public class LocalDirAllocator { int getCurrentDirectoryIndex() { return dirNumLastAccessed; } - - /** Get a path from the local FS. This method should be used if the size of - * the file is not known a priori. - * - * It will use roulette selection, picking directories - * with probability proportional to their available space. - */ - public synchronized Path getLocalPathForWrite(String path, - Configuration conf) throws IOException { - return getLocalPathForWrite(path, SIZE_UNKNOWN, conf); - } /** Get a path from the local FS. If size is known, we go * round-robin over the set of disks (via the configured dirs) and return @@ -292,7 +321,7 @@ public class LocalDirAllocator { * with probability proportional to their available space. */ public synchronized Path getLocalPathForWrite(String pathStr, long size, - Configuration conf) throws IOException { + Configuration conf, boolean checkWrite) throws IOException { confChanged(conf); int numDirs = localDirs.length; int numDirsSearched = 0; @@ -324,7 +353,7 @@ public class LocalDirAllocator { dir++; } dirNumLastAccessed = dir; - returnPath = createPath(pathStr); + returnPath = createPath(pathStr, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk @@ -335,7 +364,7 @@ public class LocalDirAllocator { while (numDirsSearched < numDirs && returnPath == null) { long capacity = dirDF[dirNumLastAccessed].getAvailable(); if (capacity > size) { - returnPath = createPath(pathStr); + returnPath = createPath(pathStr, checkWrite); } dirNumLastAccessed++; dirNumLastAccessed = dirNumLastAccessed % numDirs; @@ -361,7 +390,7 @@ public class LocalDirAllocator { Configuration conf) throws IOException { // find an appropriate directory - Path path = getLocalPathForWrite(pathStr, size, conf); + Path path = getLocalPathForWrite(pathStr, size, conf, true); File dir = new File(path.getParent().toUri().getPath()); String prefix = path.getName(); @@ -398,6 +427,74 @@ public class LocalDirAllocator { " the configured local directories"); } + private static class PathIterator implements Iterator, Iterable { + private final FileSystem fs; + private final String pathStr; + private int i = 0; + private final String[] rootDirs; + private Path next = null; + + private PathIterator(FileSystem fs, String pathStr, String[] rootDirs) + throws IOException { + this.fs = fs; + this.pathStr = pathStr; + this.rootDirs = rootDirs; + advance(); + } + + @Override + public boolean hasNext() { + return next != null; + } + + private void advance() throws IOException { + while (i < rootDirs.length) { + next = new Path(rootDirs[i++], pathStr); + if (fs.exists(next)) { + return; + } + } + next = null; + } + + @Override + public Path next() { + Path result = next; + try { + advance(); + } catch (IOException ie) { + throw new RuntimeException("Can't check existance of " + next, ie); + } + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("read only iterator"); + } + + @Override + public Iterator iterator() { + return this; + } + } + + /** + * Get all of the paths that currently exist in the working directories. + * @param pathStr the path underneath the roots + * @param conf the configuration to look up the roots in + * @return all of the paths that exist under any of the roots + * @throws IOException + */ + synchronized Iterable getAllLocalPathsToRead(String pathStr, + Configuration conf) throws IOException { + confChanged(conf); + if (pathStr.startsWith("/")) { + pathStr = pathStr.substring(1); + } + return new PathIterator(localFS, pathStr, localDirs); + } + /** We search through all the configured dirs for the file's existence * and return true when we find one */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index eef90308aa9..1e22a73bbac 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -208,4 +209,33 @@ public class TestLocalDirAllocator extends TestCase { } } + /** Two buffer dirs. The first dir does not exist & is on a read-only disk; + * The second dir exists & is RW + * getLocalPathForWrite with checkAccess set to false should create a parent + * directory. With checkAccess true, the directory should not be created. + * @throws Exception + */ + public void testLocalPathForWriteDirCreation() throws IOException { + try { + conf.set(CONTEXT, BUFFER_DIR[0] + "," + BUFFER_DIR[1]); + assertTrue(localFs.mkdirs(BUFFER_PATH[1])); + BUFFER_ROOT.setReadOnly(); + Path p1 = + dirAllocator.getLocalPathForWrite("p1/x", SMALL_FILE_SIZE, conf); + assertTrue(localFs.getFileStatus(p1.getParent()).isDirectory()); + + Path p2 = + dirAllocator.getLocalPathForWrite("p2/x", SMALL_FILE_SIZE, conf, + false); + try { + localFs.getFileStatus(p2.getParent()); + } catch (Exception e) { + assertEquals(e.getClass(), FileNotFoundException.class); + } + } finally { + Shell.execCommand(new String[] { "chmod", "u+w", BUFFER_DIR_ROOT }); + rmBufferDirs(); + } + } + }