HBASE-13732 TestHBaseFsck#testParallelWithRetriesHbck fails intermittently (Stephen Yuan Jiang)
This commit is contained in:
parent
6529d8833d
commit
ef18d75d00
|
@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.util;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -29,12 +29,16 @@ public class RetryCounterFactory {
|
|||
private final RetryConfig retryConfig;
|
||||
|
||||
public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis) {
|
||||
this(maxAttempts, sleepIntervalMillis, -1);
|
||||
}
|
||||
|
||||
public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis, int maxSleepTime) {
|
||||
this(new RetryConfig(
|
||||
maxAttempts,
|
||||
sleepIntervalMillis,
|
||||
-1,
|
||||
maxSleepTime,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new ExponentialBackoffPolicy()));
|
||||
new ExponentialBackoffPolicyWithLimit()));
|
||||
}
|
||||
|
||||
public RetryCounterFactory(RetryConfig retryConfig) {
|
||||
|
|
|
@ -199,7 +199,9 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
private static final String TO_BE_LOADED = "to_be_loaded";
|
||||
private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
|
||||
private static final int DEFAULT_MAX_LOCK_FILE_ATTEMPTS = 5;
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200;
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200; // milliseconds
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME = 5000; // milliseconds
|
||||
private static final int DEFAULT_WAIT_FOR_LOCK_TIMEOUT = 30; // seconds
|
||||
|
||||
/**********************
|
||||
* Internal resources
|
||||
|
@ -317,9 +319,11 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
||||
executor = new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck"));
|
||||
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval",
|
||||
DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.maxsleeptime", DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -338,10 +342,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
errors = getErrorReporter(getConf());
|
||||
this.executor = exec;
|
||||
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.maxsleeptime", DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME));
|
||||
}
|
||||
|
||||
|
||||
private class FileLockCallable implements Callable<FSDataOutputStream> {
|
||||
RetryCounter retryCounter;
|
||||
|
||||
|
@ -411,7 +418,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
FutureTask<FSDataOutputStream> futureTask = new FutureTask<FSDataOutputStream>(callable);
|
||||
executor.execute(futureTask);
|
||||
final int timeoutInSeconds = 30;
|
||||
final int timeoutInSeconds = getConf().getInt(
|
||||
"hbase.hbck.lockfile.maxwaittime", DEFAULT_WAIT_FOR_LOCK_TIMEOUT);
|
||||
FSDataOutputStream stream = null;
|
||||
try {
|
||||
stream = futureTask.get(timeoutInSeconds, TimeUnit.SECONDS);
|
||||
|
@ -438,6 +446,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
IOUtils.closeQuietly(hbckOutFd);
|
||||
FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()),
|
||||
HBCK_LOCK_PATH, true);
|
||||
LOG.info("Finishing hbck");
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Failed to delete " + HBCK_LOCK_PATH + ", try="
|
||||
|
@ -454,7 +463,6 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
}
|
||||
}
|
||||
} while (retryCounter.shouldRetry());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -477,7 +485,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
// Make sure to cleanup the lock
|
||||
hbckLockCleanup.set(true);
|
||||
|
||||
// Add a shutdown hook to this thread, incase user tries to
|
||||
// Add a shutdown hook to this thread, in case user tries to
|
||||
// kill the hbck with a ctrl-c, we want to cleanup the lock so that
|
||||
// it is available for further calls
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
|
@ -487,7 +495,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
unlockHbck();
|
||||
}
|
||||
});
|
||||
LOG.debug("Launching hbck");
|
||||
|
||||
LOG.info("Launching hbck");
|
||||
|
||||
connection = (ClusterConnection)ConnectionFactory.createConnection(getConf());
|
||||
admin = connection.getAdmin();
|
||||
|
|
|
@ -606,7 +606,7 @@ public class TestHBaseFsck {
|
|||
}
|
||||
|
||||
/**
|
||||
* This test makes sure that with 10 retries both parallel instances
|
||||
* This test makes sure that with enough retries both parallel instances
|
||||
* of hbck will be completed successfully.
|
||||
*
|
||||
* @throws Exception
|
||||
|
@ -616,22 +616,33 @@ public class TestHBaseFsck {
|
|||
final ExecutorService service;
|
||||
final Future<HBaseFsck> hbck1,hbck2;
|
||||
|
||||
// With the ExponentialBackoffPolicyWithLimit (starting with 200 milliseconds sleep time, and
|
||||
// max sleep time of 5 seconds), we can retry around 15 times within 60 seconds before bail out.
|
||||
final int timeoutInSeconds = 60;
|
||||
final int sleepIntervalInMilliseconds = 200;
|
||||
final int maxSleepTimeInMilliseconds = 6000;
|
||||
final int maxRetryAttempts = 15;
|
||||
|
||||
class RunHbck implements Callable<HBaseFsck>{
|
||||
|
||||
@Override
|
||||
public HBaseFsck call() throws Exception {
|
||||
// Increase retry attempts to make sure the non-active hbck doesn't get starved
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setInt("hbase.hbck.lockfile.attempts", 10);
|
||||
c.setInt("hbase.hbck.lockfile.maxwaittime", timeoutInSeconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempt.sleep.interval", sleepIntervalInMilliseconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempt.maxsleeptime", maxSleepTimeInMilliseconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempts", maxRetryAttempts);
|
||||
return doFsck(c, false);
|
||||
}
|
||||
}
|
||||
|
||||
service = Executors.newFixedThreadPool(2);
|
||||
hbck1 = service.submit(new RunHbck());
|
||||
hbck2 = service.submit(new RunHbck());
|
||||
service.shutdown();
|
||||
//wait for 15 seconds, for both hbck calls finish
|
||||
service.awaitTermination(25, TimeUnit.SECONDS);
|
||||
//wait for some time, for both hbck calls finish
|
||||
service.awaitTermination(timeoutInSeconds * 2, TimeUnit.SECONDS);
|
||||
HBaseFsck h1 = hbck1.get();
|
||||
HBaseFsck h2 = hbck2.get();
|
||||
// Both should be successful
|
||||
|
|
Loading…
Reference in New Issue