HBASE-13128 Make HBCK's lock file retry creation and deletion
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
efb6e7005f
commit
4e2edb93df
|
@ -54,6 +54,13 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Multimap;
|
||||||
|
import com.google.common.collect.TreeMultimap;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -137,14 +144,6 @@ import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Multimap;
|
|
||||||
import com.google.common.collect.TreeMultimap;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBaseFsck (hbck) is a tool for checking and repairing region consistency and
|
* HBaseFsck (hbck) is a tool for checking and repairing region consistency and
|
||||||
* table integrity problems in a corrupted HBase.
|
* table integrity problems in a corrupted HBase.
|
||||||
|
@ -201,7 +200,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
private static final int DEFAULT_MAX_MERGE = 5;
|
private static final int DEFAULT_MAX_MERGE = 5;
|
||||||
private static final String TO_BE_LOADED = "to_be_loaded";
|
private static final String TO_BE_LOADED = "to_be_loaded";
|
||||||
private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
|
private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
|
||||||
|
private static final int DEFAULT_MAX_LOCK_FILE_ATTEMPTS = 5;
|
||||||
|
private static final int DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200;
|
||||||
|
|
||||||
/**********************
|
/**********************
|
||||||
* Internal resources
|
* Internal resources
|
||||||
|
@ -295,10 +295,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
private Map<TableName, Set<String>> orphanTableDirs =
|
private Map<TableName, Set<String>> orphanTableDirs =
|
||||||
new HashMap<TableName, Set<String>>();
|
new HashMap<TableName, Set<String>>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of orphaned table ZNodes
|
* List of orphaned table ZNodes
|
||||||
*/
|
*/
|
||||||
private Set<TableName> orphanedTableZNodes = new HashSet<TableName>();
|
private Set<TableName> orphanedTableZNodes = new HashSet<TableName>();
|
||||||
|
private final RetryCounterFactory lockFileRetryCounterFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
@ -320,6 +323,10 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
|
|
||||||
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
||||||
executor = new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck"));
|
executor = new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck"));
|
||||||
|
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||||
|
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||||
|
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval",
|
||||||
|
DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -337,9 +344,17 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
super(conf);
|
super(conf);
|
||||||
errors = getErrorReporter(getConf());
|
errors = getErrorReporter(getConf());
|
||||||
this.executor = exec;
|
this.executor = exec;
|
||||||
|
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||||
|
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||||
|
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FileLockCallable implements Callable<FSDataOutputStream> {
|
private class FileLockCallable implements Callable<FSDataOutputStream> {
|
||||||
|
RetryCounter retryCounter;
|
||||||
|
|
||||||
|
public FileLockCallable(RetryCounter retryCounter) {
|
||||||
|
this.retryCounter = retryCounter;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream call() throws IOException {
|
public FSDataOutputStream call() throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -349,7 +364,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY);
|
Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY);
|
||||||
fs.mkdirs(tmpDir);
|
fs.mkdirs(tmpDir);
|
||||||
HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
|
HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
|
||||||
final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false);
|
final FSDataOutputStream out = createFileWithRetries(fs, HBCK_LOCK_PATH, defaultPerms);
|
||||||
out.writeBytes(InetAddress.getLocalHost().toString());
|
out.writeBytes(InetAddress.getLocalHost().toString());
|
||||||
out.flush();
|
out.flush();
|
||||||
return out;
|
return out;
|
||||||
|
@ -361,6 +376,34 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FSDataOutputStream createFileWithRetries(final FileSystem fs,
|
||||||
|
final Path hbckLockFilePath, final FsPermission defaultPerms)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
IOException exception = null;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
return FSUtils.create(fs, hbckLockFilePath, defaultPerms, false);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("Failed to create lock file " + hbckLockFilePath.getName()
|
||||||
|
+ ", try=" + (retryCounter.getAttemptTimes() + 1) + " of "
|
||||||
|
+ retryCounter.getMaxAttempts());
|
||||||
|
LOG.debug("Failed to create lock file " + hbckLockFilePath.getName(),
|
||||||
|
ioe);
|
||||||
|
try {
|
||||||
|
exception = ioe;
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw (InterruptedIOException) new InterruptedIOException(
|
||||||
|
"Can't create lock file " + hbckLockFilePath.getName())
|
||||||
|
.initCause(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (retryCounter.shouldRetry());
|
||||||
|
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -370,7 +413,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
|
private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
|
||||||
FileLockCallable callable = new FileLockCallable();
|
RetryCounter retryCounter = lockFileRetryCounterFactory.create();
|
||||||
|
FileLockCallable callable = new FileLockCallable(retryCounter);
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
FutureTask<FSDataOutputStream> futureTask = new FutureTask<FSDataOutputStream>(callable);
|
FutureTask<FSDataOutputStream> futureTask = new FutureTask<FSDataOutputStream>(callable);
|
||||||
executor.execute(futureTask);
|
executor.execute(futureTask);
|
||||||
|
@ -394,15 +438,31 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unlockHbck() {
|
private void unlockHbck() {
|
||||||
if(hbckLockCleanup.compareAndSet(true, false)){
|
if (hbckLockCleanup.compareAndSet(true, false)) {
|
||||||
|
RetryCounter retryCounter = lockFileRetryCounterFactory.create();
|
||||||
|
do {
|
||||||
|
try {
|
||||||
IOUtils.closeStream(hbckOutFd);
|
IOUtils.closeStream(hbckOutFd);
|
||||||
try{
|
FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()),
|
||||||
FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), HBCK_LOCK_PATH, true);
|
HBCK_LOCK_PATH, true);
|
||||||
} catch(IOException ioe) {
|
return;
|
||||||
LOG.warn("Failed to delete " + HBCK_LOCK_PATH);
|
} catch (IOException ioe) {
|
||||||
LOG.debug(ioe);
|
LOG.info("Failed to delete " + HBCK_LOCK_PATH + ", try="
|
||||||
|
+ (retryCounter.getAttemptTimes() + 1) + " of "
|
||||||
|
+ retryCounter.getMaxAttempts());
|
||||||
|
LOG.debug("Failed to delete " + HBCK_LOCK_PATH, ioe);
|
||||||
|
try {
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.warn("Interrupted while deleting lock file" +
|
||||||
|
HBCK_LOCK_PATH);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} while (retryCounter.shouldRetry());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -561,8 +561,10 @@ public class TestHBaseFsck {
|
||||||
boolean fail = true;
|
boolean fail = true;
|
||||||
@Override
|
@Override
|
||||||
public HBaseFsck call(){
|
public HBaseFsck call(){
|
||||||
|
Configuration c = new Configuration(conf);
|
||||||
|
c.setInt("hbase.hbck.lockfile.attempts", 1);
|
||||||
try{
|
try{
|
||||||
return doFsck(conf, false);
|
return doFsck(c, false);
|
||||||
} catch(Exception e){
|
} catch(Exception e){
|
||||||
if (e.getMessage().contains("Duplicate hbck")) {
|
if (e.getMessage().contains("Duplicate hbck")) {
|
||||||
fail = false;
|
fail = false;
|
||||||
|
@ -591,6 +593,40 @@ public class TestHBaseFsck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test makes sure that with 5 retries both parallel instances
|
||||||
|
* of hbck will be completed successfully.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test (timeout=180000)
|
||||||
|
public void testParallelWithRetriesHbck() throws Exception {
|
||||||
|
final ExecutorService service;
|
||||||
|
final Future<HBaseFsck> hbck1,hbck2;
|
||||||
|
|
||||||
|
class RunHbck implements Callable<HBaseFsck>{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HBaseFsck call() throws Exception {
|
||||||
|
return doFsck(conf, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service = Executors.newFixedThreadPool(2);
|
||||||
|
hbck1 = service.submit(new RunHbck());
|
||||||
|
hbck2 = service.submit(new RunHbck());
|
||||||
|
service.shutdown();
|
||||||
|
//wait for 15 seconds, for both hbck calls finish
|
||||||
|
service.awaitTermination(15, TimeUnit.SECONDS);
|
||||||
|
HBaseFsck h1 = hbck1.get();
|
||||||
|
HBaseFsck h2 = hbck2.get();
|
||||||
|
// Both should be successful
|
||||||
|
assertNotNull(h1);
|
||||||
|
assertNotNull(h2);
|
||||||
|
assert(h1.getRetCode() >= 0);
|
||||||
|
assert(h2.getRetCode() >= 0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This create and fixes a bad table with regions that have a duplicate
|
* This create and fixes a bad table with regions that have a duplicate
|
||||||
* start key
|
* start key
|
||||||
|
|
Loading…
Reference in New Issue