diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 8bd5903873b..c93d7a6bbaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; /** * View to an on-disk Region. @@ -74,6 +74,15 @@ public class HRegionFileSystem { private final Configuration conf; private final Path tableDir; private final FileSystem fs; + + /** + * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the + * client level. + */ + private final int hdfsClientRetriesNumber; + private final int baseSleepBeforeRetries; + private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10; + private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000; /** * Create a view to the on-disk region @@ -82,13 +91,17 @@ public class HRegionFileSystem { * @param tableDir {@link Path} to where the table is being stored * @param regionInfo {@link HRegionInfo} for region */ - HRegionFileSystem(final Configuration conf, final FileSystem fs, - final Path tableDir, final HRegionInfo regionInfo) { + HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, + final HRegionInfo regionInfo) { this.fs = fs; this.conf = conf; this.tableDir = tableDir; this.regionInfo = regionInfo; - } + this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", + DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); + this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", + DEFAULT_BASE_SLEEP_BEFORE_RETRIES); + } /** @return the underlying {@link FileSystem} */ public FileSystem getFileSystem() { @@ -122,7 +135,7 @@ public class HRegionFileSystem { * Clean up any temp detritus that may have been left around from previous operation attempts. */ void cleanupTempDir() throws IOException { - FSUtils.deleteDirectory(fs, getTempDir()); + deleteDir(getTempDir()); } // =========================================================================== @@ -145,9 +158,8 @@ public class HRegionFileSystem { */ Path createStoreDir(final String familyName) throws IOException { Path storeDir = getStoreDir(familyName); - if (!fs.exists(storeDir) && !fs.mkdirs(storeDir)) { - throw new IOException("Failed create of: " + storeDir); - } + if(!fs.exists(storeDir) && !createDir(storeDir)) + throw new IOException("Failed creating "+storeDir); return storeDir; } @@ -240,11 +252,10 @@ public class HRegionFileSystem { // delete the family folder Path familyDir = getStoreDir(familyName); - if (!fs.delete(familyDir, true)) { - throw new IOException("Could not delete family " + familyName + - " from FileSystem for region " + regionInfo.getRegionNameAsString() + - "(" + regionInfo.getEncodedName() + ")"); - } + if(fs.exists(familyDir) && !deleteDir(familyDir)) + throw new IOException("Could not delete family " + familyName + + " from FileSystem for region " + regionInfo.getRegionNameAsString() + "(" + + regionInfo.getEncodedName() + ")"); } /** @@ -312,7 +323,9 @@ public class HRegionFileSystem { private Path commitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { Path storeDir = getStoreDir(familyName); - fs.mkdirs(storeDir); + if(!fs.exists(storeDir) && !createDir(storeDir)) + throw new IOException("Failed creating " + storeDir); + String name = buildPath.getName(); if (generateNewName) { name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_"); @@ -322,12 +335,14 @@ public class HRegionFileSystem { throw new FileNotFoundException(buildPath.toString()); } LOG.debug("Committing store file " + buildPath + " as " + dstPath); - if (!fs.rename(buildPath, dstPath)) { + // buildPath exists, therefore not doing an exists() check. + if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); } return dstPath; } + /** * Moves multiple store files to the relative region's family store directory. * @param storeFiles list of store files divided by family @@ -414,7 +429,7 @@ public class HRegionFileSystem { * Clean up any split detritus that may have been left around from previous split attempts. */ void cleanupSplitsDir() throws IOException { - FSUtils.deleteDirectory(fs, getSplitsDir()); + deleteDir(getSplitsDir()); } /** @@ -437,7 +452,7 @@ public class HRegionFileSystem { if (daughters != null) { for (FileStatus daughter: daughters) { Path daughterDir = new Path(getTableDir(), daughter.getPath().getName()); - if (fs.exists(daughterDir) && !fs.delete(daughterDir, true)) { + if (fs.exists(daughterDir) && !deleteDir(daughterDir)) { throw new IOException("Failed delete of " + daughterDir); } } @@ -453,7 +468,7 @@ public class HRegionFileSystem { */ void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException { Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName()); - if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) { + if (this.fs.exists(regionDir) && !deleteDir(regionDir)) { throw new IOException("Failed delete of " + regionDir); } } @@ -467,7 +482,7 @@ public class HRegionFileSystem { Path commitDaughterRegion(final HRegionInfo regionInfo) throws IOException { Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName()); Path daughterTmpDir = this.getSplitsDir(regionInfo); - if (fs.exists(daughterTmpDir) && !fs.rename(daughterTmpDir, regionDir)) { + if (fs.exists(daughterTmpDir) && !rename(daughterTmpDir, regionDir)) { throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir); } return regionDir; @@ -480,12 +495,13 @@ public class HRegionFileSystem { Path splitdir = getSplitsDir(); if (fs.exists(splitdir)) { LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it"); - if (!fs.delete(splitdir, true)) { + if (!deleteDir(splitdir)) { throw new IOException("Failed deletion of " + splitdir + " before creating them again."); } } - if (!fs.mkdirs(splitdir)) { + // splitDir doesn't exists now. No need to do an exists() call for it. + if (!createDir(splitdir)) { throw new IOException("Failed create of " + splitdir); } } @@ -534,7 +550,7 @@ public class HRegionFileSystem { * Clean up any merge detritus that may have been left around from previous merge attempts. */ void cleanupMergesDir() throws IOException { - FSUtils.deleteDirectory(fs, getMergesDir()); + deleteDir(getMergesDir()); } /** @@ -740,7 +756,7 @@ public class HRegionFileSystem { writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent); // Move the created file to the original path - if (!fs.rename(tmpPath, regionInfoFile)) { + if (fs.exists(tmpPath) && !rename(tmpPath, regionInfoFile)) { throw new IOException("Unable to rename " + tmpPath + " to " + regionInfoFile); } } else { @@ -768,7 +784,7 @@ public class HRegionFileSystem { } // Create the region directory - if (!fs.mkdirs(regionFs.getRegionDir())) { + if (!createDirOnFileSystem(fs, conf, regionDir)) { LOG.warn("Unable to create the region directory: " + regionDir); throw new IOException("Unable to create region directory: " + regionDir); } @@ -842,4 +858,122 @@ public class HRegionFileSystem { LOG.warn("Failed delete of " + regionDir); } } + + /** + * Creates a directory. Assumes the user has already checked for this directory existence. + * @param dir + * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks + * whether the directory exists or not, and returns true if it exists. + * @throws IOException + */ + boolean createDir(Path dir) throws IOException { + int i = 0; + IOException lastIOE = null; + do { + try { + return fs.mkdirs(dir); + } catch (IOException ioe) { + lastIOE = ioe; + if (fs.exists(dir)) return true; // directory is present + sleepBeforeRetry("Create Directory", i+1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createDir", lastIOE); + } + + /** + * Renames a directory. Assumes the user has already checked for this directory existence. + * @param srcpath + * @param dstPath + * @return true if rename is successful. + * @throws IOException + */ + boolean rename(Path srcpath, Path dstPath) throws IOException { + IOException lastIOE = null; + int i = 0; + do { + try { + return fs.rename(srcpath, dstPath); + } catch (IOException ioe) { + lastIOE = ioe; + if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move + // dir is not there, retry after some time. + sleepBeforeRetry("Rename Directory", i+1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in rename", lastIOE); + } + + /** + * Deletes a directory. Assumes the user has already checked for this directory existence. + * @param dir + * @return true if the directory is deleted. + * @throws IOException + */ + boolean deleteDir(Path dir) throws IOException { + IOException lastIOE = null; + int i = 0; + do { + try { + return fs.delete(dir, true); + } catch (IOException ioe) { + lastIOE = ioe; + if (!fs.exists(dir)) return true; + // dir is there, retry deleting after some time. + sleepBeforeRetry("Delete Directory", i+1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in DeleteDir", lastIOE); + } + + /** + * sleeping logic; handles the interrupt exception. + */ + private void sleepBeforeRetry(String msg, int sleepMultiplier) { + sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber); + } + + /** + * Creates a directory for a filesystem and configuration object. Assumes the user has already + * checked for this directory existence. + * @param fs + * @param conf + * @param dir + * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks + * whether the directory exists or not, and returns true if it exists. + * @throws IOException + */ + private static boolean createDirOnFileSystem(FileSystem fs, Configuration conf, Path dir) + throws IOException { + int i = 0; + IOException lastIOE = null; + int hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", + DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); + int baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", + DEFAULT_BASE_SLEEP_BEFORE_RETRIES); + do { + try { + return fs.mkdirs(dir); + } catch (IOException ioe) { + lastIOE = ioe; + if (fs.exists(dir)) return true; // directory is present + sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createDir", lastIOE); + } + + /** + * sleeping logic for static methods; handles the interrupt exception. Keeping a static version + * for this to avoid re-looking for the integer values. + */ + private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries, + int hdfsClientRetriesNumber) { + if (sleepMultiplier > hdfsClientRetriesNumber) { + LOG.debug(msg + ", retries exhausted"); + return; + } + LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier); + Threads.sleep(baseSleepBeforeRetries * sleepMultiplier); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index dfc941e6da7..e0027ab33da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -166,19 +166,6 @@ public abstract class FSUtils { return fs.exists(dir) && fs.delete(dir, true); } - /** - * Check if directory exists. If it does not, create it. - * @param fs filesystem object - * @param dir path to check - * @return Path - * @throws IOException e - */ - public Path checkdir(final FileSystem fs, final Path dir) throws IOException { - if (!fs.exists(dir)) { - fs.mkdirs(dir); - } - return dir; - } /** * Create the specified file on the filesystem. By default, this will: diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index 8692cbfebd2..044aa022992 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -24,15 +24,25 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.net.URI; import java.util.Collection; +import javax.management.RuntimeErrorException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Progressable; import org.junit.Test; import org.junit.AfterClass; @@ -43,6 +53,7 @@ import junit.framework.TestCase; @Category(SmallTests.class) public class TestHRegionFileSystem { private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class); @Test public void testOnDiskRegionCreation() throws IOException { @@ -73,6 +84,130 @@ public class TestHRegionFileSystem { fs.delete(rootDir, true); } + @Test + public void testNonIdempotentOpsWithRetries() throws IOException { + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + + // Create a Region + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("TestTable")); + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri); + assertTrue(fs.exists(regionFs.getRegionDir())); + + regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), + null, null); + // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir, + // hri); + boolean result = regionFs.createDir(new Path("/foo/bar")); + assertTrue("Couldn't create the directory", result); + + + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2")); + assertTrue("Couldn't rename the directory", result); + + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + result = regionFs.deleteDir(new Path("/foo/bar")); + assertTrue("Couldn't delete the directory", result); + fs.delete(rootDir, true); + } + + static class MockFileSystemForCreate extends MockFileSystem { + @Override + public boolean exists(Path path) { + return false; + } + } + + /** + * a mock fs which throws exception for first 3 times, and then process the call (returns the + * excepted result). + */ + static class MockFileSystem extends FileSystem { + int retryCount; + final static int successRetryCount = 3; + + public MockFileSystem() { + retryCount = 0; + } + + @Override + public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException { + throw new IOException(""); + } + + @Override + public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3, + short arg4, long arg5, Progressable arg6) throws IOException { + LOG.debug("Create, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return null; + } + + @Override + public boolean delete(Path arg0) throws IOException { + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public boolean delete(Path arg0, boolean arg1) throws IOException { + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public FileStatus getFileStatus(Path arg0) throws IOException { + FileStatus fs = new FileStatus(); + return fs; + } + + @Override + public boolean exists(Path path) { + return true; + } + + @Override + public URI getUri() { + throw new RuntimeException("Something bad happen"); + } + + @Override + public Path getWorkingDirectory() { + throw new RuntimeException("Something bad happen"); + } + + @Override + public FileStatus[] listStatus(Path arg0) throws IOException { + throw new IOException("Something bad happen"); + } + + @Override + public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException { + LOG.debug("mkdirs, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public FSDataInputStream open(Path arg0, int arg1) throws IOException { + throw new IOException("Something bad happen"); + } + + @Override + public boolean rename(Path arg0, Path arg1) throws IOException { + LOG.debug("rename, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public void setWorkingDirectory(Path arg0) { + throw new RuntimeException("Something bad happen"); + } + } + @Test public void testTempAndCommit() throws IOException { Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testTempAndCommit");