HBASE-8156 Support for Namenode HA for non-idempotent operations
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1460973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3d6dee2c3c
commit
56fd88fff6
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
|
|||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
/**
|
||||
* View to an on-disk Region.
|
||||
|
@ -75,6 +75,15 @@ public class HRegionFileSystem {
|
|||
private final Path tableDir;
|
||||
private final FileSystem fs;
|
||||
|
||||
/**
|
||||
* In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
|
||||
* client level.
|
||||
*/
|
||||
private final int hdfsClientRetriesNumber;
|
||||
private final int baseSleepBeforeRetries;
|
||||
private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10;
|
||||
private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000;
|
||||
|
||||
/**
|
||||
* Create a view to the on-disk region
|
||||
* @param conf the {@link Configuration} to use
|
||||
|
@ -82,13 +91,17 @@ public class HRegionFileSystem {
|
|||
* @param tableDir {@link Path} to where the table is being stored
|
||||
* @param regionInfo {@link HRegionInfo} for region
|
||||
*/
|
||||
HRegionFileSystem(final Configuration conf, final FileSystem fs,
|
||||
final Path tableDir, final HRegionInfo regionInfo) {
|
||||
HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir,
|
||||
final HRegionInfo regionInfo) {
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.tableDir = tableDir;
|
||||
this.regionInfo = regionInfo;
|
||||
}
|
||||
this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
|
||||
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
|
||||
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
|
||||
DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
|
||||
}
|
||||
|
||||
/** @return the underlying {@link FileSystem} */
|
||||
public FileSystem getFileSystem() {
|
||||
|
@ -122,7 +135,7 @@ public class HRegionFileSystem {
|
|||
* Clean up any temp detritus that may have been left around from previous operation attempts.
|
||||
*/
|
||||
void cleanupTempDir() throws IOException {
|
||||
FSUtils.deleteDirectory(fs, getTempDir());
|
||||
deleteDir(getTempDir());
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
|
@ -145,9 +158,8 @@ public class HRegionFileSystem {
|
|||
*/
|
||||
Path createStoreDir(final String familyName) throws IOException {
|
||||
Path storeDir = getStoreDir(familyName);
|
||||
if (!fs.exists(storeDir) && !fs.mkdirs(storeDir)) {
|
||||
throw new IOException("Failed create of: " + storeDir);
|
||||
}
|
||||
if(!fs.exists(storeDir) && !createDir(storeDir))
|
||||
throw new IOException("Failed creating "+storeDir);
|
||||
return storeDir;
|
||||
}
|
||||
|
||||
|
@ -240,11 +252,10 @@ public class HRegionFileSystem {
|
|||
|
||||
// delete the family folder
|
||||
Path familyDir = getStoreDir(familyName);
|
||||
if (!fs.delete(familyDir, true)) {
|
||||
throw new IOException("Could not delete family " + familyName +
|
||||
" from FileSystem for region " + regionInfo.getRegionNameAsString() +
|
||||
"(" + regionInfo.getEncodedName() + ")");
|
||||
}
|
||||
if(fs.exists(familyDir) && !deleteDir(familyDir))
|
||||
throw new IOException("Could not delete family " + familyName
|
||||
+ " from FileSystem for region " + regionInfo.getRegionNameAsString() + "("
|
||||
+ regionInfo.getEncodedName() + ")");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -312,7 +323,9 @@ public class HRegionFileSystem {
|
|||
private Path commitStoreFile(final String familyName, final Path buildPath,
|
||||
final long seqNum, final boolean generateNewName) throws IOException {
|
||||
Path storeDir = getStoreDir(familyName);
|
||||
fs.mkdirs(storeDir);
|
||||
if(!fs.exists(storeDir) && !createDir(storeDir))
|
||||
throw new IOException("Failed creating " + storeDir);
|
||||
|
||||
String name = buildPath.getName();
|
||||
if (generateNewName) {
|
||||
name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
|
||||
|
@ -322,12 +335,14 @@ public class HRegionFileSystem {
|
|||
throw new FileNotFoundException(buildPath.toString());
|
||||
}
|
||||
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
|
||||
if (!fs.rename(buildPath, dstPath)) {
|
||||
// buildPath exists, therefore not doing an exists() check.
|
||||
if (!rename(buildPath, dstPath)) {
|
||||
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
|
||||
}
|
||||
return dstPath;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Moves multiple store files to the relative region's family store directory.
|
||||
* @param storeFiles list of store files divided by family
|
||||
|
@ -414,7 +429,7 @@ public class HRegionFileSystem {
|
|||
* Clean up any split detritus that may have been left around from previous split attempts.
|
||||
*/
|
||||
void cleanupSplitsDir() throws IOException {
|
||||
FSUtils.deleteDirectory(fs, getSplitsDir());
|
||||
deleteDir(getSplitsDir());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -437,7 +452,7 @@ public class HRegionFileSystem {
|
|||
if (daughters != null) {
|
||||
for (FileStatus daughter: daughters) {
|
||||
Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
|
||||
if (fs.exists(daughterDir) && !fs.delete(daughterDir, true)) {
|
||||
if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
|
||||
throw new IOException("Failed delete of " + daughterDir);
|
||||
}
|
||||
}
|
||||
|
@ -453,7 +468,7 @@ public class HRegionFileSystem {
|
|||
*/
|
||||
void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException {
|
||||
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
|
||||
if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
|
||||
if (this.fs.exists(regionDir) && !deleteDir(regionDir)) {
|
||||
throw new IOException("Failed delete of " + regionDir);
|
||||
}
|
||||
}
|
||||
|
@ -467,7 +482,7 @@ public class HRegionFileSystem {
|
|||
Path commitDaughterRegion(final HRegionInfo regionInfo) throws IOException {
|
||||
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
|
||||
Path daughterTmpDir = this.getSplitsDir(regionInfo);
|
||||
if (fs.exists(daughterTmpDir) && !fs.rename(daughterTmpDir, regionDir)) {
|
||||
if (fs.exists(daughterTmpDir) && !rename(daughterTmpDir, regionDir)) {
|
||||
throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
|
||||
}
|
||||
return regionDir;
|
||||
|
@ -480,12 +495,13 @@ public class HRegionFileSystem {
|
|||
Path splitdir = getSplitsDir();
|
||||
if (fs.exists(splitdir)) {
|
||||
LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it");
|
||||
if (!fs.delete(splitdir, true)) {
|
||||
if (!deleteDir(splitdir)) {
|
||||
throw new IOException("Failed deletion of " + splitdir
|
||||
+ " before creating them again.");
|
||||
}
|
||||
}
|
||||
if (!fs.mkdirs(splitdir)) {
|
||||
// splitDir doesn't exists now. No need to do an exists() call for it.
|
||||
if (!createDir(splitdir)) {
|
||||
throw new IOException("Failed create of " + splitdir);
|
||||
}
|
||||
}
|
||||
|
@ -534,7 +550,7 @@ public class HRegionFileSystem {
|
|||
* Clean up any merge detritus that may have been left around from previous merge attempts.
|
||||
*/
|
||||
void cleanupMergesDir() throws IOException {
|
||||
FSUtils.deleteDirectory(fs, getMergesDir());
|
||||
deleteDir(getMergesDir());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -740,7 +756,7 @@ public class HRegionFileSystem {
|
|||
writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);
|
||||
|
||||
// Move the created file to the original path
|
||||
if (!fs.rename(tmpPath, regionInfoFile)) {
|
||||
if (fs.exists(tmpPath) && !rename(tmpPath, regionInfoFile)) {
|
||||
throw new IOException("Unable to rename " + tmpPath + " to " + regionInfoFile);
|
||||
}
|
||||
} else {
|
||||
|
@ -768,7 +784,7 @@ public class HRegionFileSystem {
|
|||
}
|
||||
|
||||
// Create the region directory
|
||||
if (!fs.mkdirs(regionFs.getRegionDir())) {
|
||||
if (!createDirOnFileSystem(fs, conf, regionDir)) {
|
||||
LOG.warn("Unable to create the region directory: " + regionDir);
|
||||
throw new IOException("Unable to create region directory: " + regionDir);
|
||||
}
|
||||
|
@ -842,4 +858,122 @@ public class HRegionFileSystem {
|
|||
LOG.warn("Failed delete of " + regionDir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a directory. Assumes the user has already checked for this directory existence.
|
||||
* @param dir
|
||||
* @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
|
||||
* whether the directory exists or not, and returns true if it exists.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean createDir(Path dir) throws IOException {
|
||||
int i = 0;
|
||||
IOException lastIOE = null;
|
||||
do {
|
||||
try {
|
||||
return fs.mkdirs(dir);
|
||||
} catch (IOException ioe) {
|
||||
lastIOE = ioe;
|
||||
if (fs.exists(dir)) return true; // directory is present
|
||||
sleepBeforeRetry("Create Directory", i+1);
|
||||
}
|
||||
} while (++i <= hdfsClientRetriesNumber);
|
||||
throw new IOException("Exception in createDir", lastIOE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Renames a directory. Assumes the user has already checked for this directory existence.
|
||||
* @param srcpath
|
||||
* @param dstPath
|
||||
* @return true if rename is successful.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean rename(Path srcpath, Path dstPath) throws IOException {
|
||||
IOException lastIOE = null;
|
||||
int i = 0;
|
||||
do {
|
||||
try {
|
||||
return fs.rename(srcpath, dstPath);
|
||||
} catch (IOException ioe) {
|
||||
lastIOE = ioe;
|
||||
if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move
|
||||
// dir is not there, retry after some time.
|
||||
sleepBeforeRetry("Rename Directory", i+1);
|
||||
}
|
||||
} while (++i <= hdfsClientRetriesNumber);
|
||||
throw new IOException("Exception in rename", lastIOE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a directory. Assumes the user has already checked for this directory existence.
|
||||
* @param dir
|
||||
* @return true if the directory is deleted.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean deleteDir(Path dir) throws IOException {
|
||||
IOException lastIOE = null;
|
||||
int i = 0;
|
||||
do {
|
||||
try {
|
||||
return fs.delete(dir, true);
|
||||
} catch (IOException ioe) {
|
||||
lastIOE = ioe;
|
||||
if (!fs.exists(dir)) return true;
|
||||
// dir is there, retry deleting after some time.
|
||||
sleepBeforeRetry("Delete Directory", i+1);
|
||||
}
|
||||
} while (++i <= hdfsClientRetriesNumber);
|
||||
throw new IOException("Exception in DeleteDir", lastIOE);
|
||||
}
|
||||
|
||||
/**
|
||||
* sleeping logic; handles the interrupt exception.
|
||||
*/
|
||||
private void sleepBeforeRetry(String msg, int sleepMultiplier) {
|
||||
sleepBeforeRetry(msg, sleepMultiplier, baseSleepBeforeRetries, hdfsClientRetriesNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a directory for a filesystem and configuration object. Assumes the user has already
|
||||
* checked for this directory existence.
|
||||
* @param fs
|
||||
* @param conf
|
||||
* @param dir
|
||||
* @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
|
||||
* whether the directory exists or not, and returns true if it exists.
|
||||
* @throws IOException
|
||||
*/
|
||||
private static boolean createDirOnFileSystem(FileSystem fs, Configuration conf, Path dir)
|
||||
throws IOException {
|
||||
int i = 0;
|
||||
IOException lastIOE = null;
|
||||
int hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
|
||||
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
|
||||
int baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
|
||||
DEFAULT_BASE_SLEEP_BEFORE_RETRIES);
|
||||
do {
|
||||
try {
|
||||
return fs.mkdirs(dir);
|
||||
} catch (IOException ioe) {
|
||||
lastIOE = ioe;
|
||||
if (fs.exists(dir)) return true; // directory is present
|
||||
sleepBeforeRetry("Create Directory", i+1, baseSleepBeforeRetries, hdfsClientRetriesNumber);
|
||||
}
|
||||
} while (++i <= hdfsClientRetriesNumber);
|
||||
throw new IOException("Exception in createDir", lastIOE);
|
||||
}
|
||||
|
||||
/**
|
||||
* sleeping logic for static methods; handles the interrupt exception. Keeping a static version
|
||||
* for this to avoid re-looking for the integer values.
|
||||
*/
|
||||
private static void sleepBeforeRetry(String msg, int sleepMultiplier, int baseSleepBeforeRetries,
|
||||
int hdfsClientRetriesNumber) {
|
||||
if (sleepMultiplier > hdfsClientRetriesNumber) {
|
||||
LOG.debug(msg + ", retries exhausted");
|
||||
return;
|
||||
}
|
||||
LOG.debug(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier);
|
||||
Threads.sleep(baseSleepBeforeRetries * sleepMultiplier);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -166,19 +166,6 @@ public abstract class FSUtils {
|
|||
return fs.exists(dir) && fs.delete(dir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if directory exists. If it does not, create it.
|
||||
* @param fs filesystem object
|
||||
* @param dir path to check
|
||||
* @return Path
|
||||
* @throws IOException e
|
||||
*/
|
||||
public Path checkdir(final FileSystem fs, final Path dir) throws IOException {
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
}
|
||||
return dir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the specified file on the filesystem. By default, this will:
|
||||
|
|
|
@ -24,15 +24,25 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.management.RuntimeErrorException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -43,6 +53,7 @@ import junit.framework.TestCase;
|
|||
@Category(SmallTests.class)
|
||||
public class TestHRegionFileSystem {
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
|
||||
|
||||
@Test
|
||||
public void testOnDiskRegionCreation() throws IOException {
|
||||
|
@ -73,6 +84,130 @@ public class TestHRegionFileSystem {
|
|||
fs.delete(rootDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonIdempotentOpsWithRetries() throws IOException {
|
||||
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
// Create a Region
|
||||
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("TestTable"));
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
|
||||
assertTrue(fs.exists(regionFs.getRegionDir()));
|
||||
|
||||
regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
|
||||
null, null);
|
||||
// HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
|
||||
// hri);
|
||||
boolean result = regionFs.createDir(new Path("/foo/bar"));
|
||||
assertTrue("Couldn't create the directory", result);
|
||||
|
||||
|
||||
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
|
||||
result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
|
||||
assertTrue("Couldn't rename the directory", result);
|
||||
|
||||
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
|
||||
result = regionFs.deleteDir(new Path("/foo/bar"));
|
||||
assertTrue("Couldn't delete the directory", result);
|
||||
fs.delete(rootDir, true);
|
||||
}
|
||||
|
||||
static class MockFileSystemForCreate extends MockFileSystem {
|
||||
@Override
|
||||
public boolean exists(Path path) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* a mock fs which throws exception for first 3 times, and then process the call (returns the
|
||||
* excepted result).
|
||||
*/
|
||||
static class MockFileSystem extends FileSystem {
|
||||
int retryCount;
|
||||
final static int successRetryCount = 3;
|
||||
|
||||
public MockFileSystem() {
|
||||
retryCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException {
|
||||
throw new IOException("");
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3,
|
||||
short arg4, long arg5, Progressable arg6) throws IOException {
|
||||
LOG.debug("Create, " + retryCount);
|
||||
if (retryCount++ < successRetryCount) throw new IOException("Something bad happen");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path arg0) throws IOException {
|
||||
if (retryCount++ < successRetryCount) throw new IOException("Something bad happen");
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path arg0, boolean arg1) throws IOException {
|
||||
if (retryCount++ < successRetryCount) throw new IOException("Something bad happen");
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path arg0) throws IOException {
|
||||
FileStatus fs = new FileStatus();
|
||||
return fs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(Path path) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
throw new RuntimeException("Something bad happen");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
throw new RuntimeException("Something bad happen");
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path arg0) throws IOException {
|
||||
throw new IOException("Something bad happen");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
|
||||
LOG.debug("mkdirs, " + retryCount);
|
||||
if (retryCount++ < successRetryCount) throw new IOException("Something bad happen");
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path arg0, int arg1) throws IOException {
|
||||
throw new IOException("Something bad happen");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path arg0, Path arg1) throws IOException {
|
||||
LOG.debug("rename, " + retryCount);
|
||||
if (retryCount++ < successRetryCount) throw new IOException("Something bad happen");
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path arg0) {
|
||||
throw new RuntimeException("Something bad happen");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTempAndCommit() throws IOException {
|
||||
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testTempAndCommit");
|
||||
|
|
Loading…
Reference in New Issue