HBASE-13732 TestHBaseFsck#testParallelWithRetriesHbck fails intermittently (Stephen Yuan Jiang)
This commit is contained in:
parent
d5f57027b8
commit
2ba61ee1ab
@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.util;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@ -29,12 +29,16 @@ public class RetryCounterFactory {
|
||||
private final RetryConfig retryConfig;
|
||||
|
||||
public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis) {
|
||||
this(maxAttempts, sleepIntervalMillis, -1);
|
||||
}
|
||||
|
||||
public RetryCounterFactory(int maxAttempts, int sleepIntervalMillis, int maxSleepTime) {
|
||||
this(new RetryConfig(
|
||||
maxAttempts,
|
||||
sleepIntervalMillis,
|
||||
-1,
|
||||
maxSleepTime,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new ExponentialBackoffPolicy()));
|
||||
new ExponentialBackoffPolicyWithLimit()));
|
||||
}
|
||||
|
||||
public RetryCounterFactory(RetryConfig retryConfig) {
|
||||
|
@ -203,7 +203,9 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
private static final String TO_BE_LOADED = "to_be_loaded";
|
||||
private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
|
||||
private static final int DEFAULT_MAX_LOCK_FILE_ATTEMPTS = 5;
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200;
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200; // milliseconds
|
||||
private static final int DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME = 5000; // milliseconds
|
||||
private static final int DEFAULT_WAIT_FOR_LOCK_TIMEOUT = 30; // seconds
|
||||
|
||||
/**********************
|
||||
* Internal resources
|
||||
@ -327,9 +329,11 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
||||
executor = new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck"));
|
||||
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval",
|
||||
DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.maxsleeptime", DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -348,10 +352,13 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
errors = getErrorReporter(getConf());
|
||||
this.executor = exec;
|
||||
lockFileRetryCounterFactory = new RetryCounterFactory(
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL));
|
||||
getConf().getInt("hbase.hbck.lockfile.attempts", DEFAULT_MAX_LOCK_FILE_ATTEMPTS),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL),
|
||||
getConf().getInt(
|
||||
"hbase.hbck.lockfile.attempt.maxsleeptime", DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME));
|
||||
}
|
||||
|
||||
|
||||
private class FileLockCallable implements Callable<FSDataOutputStream> {
|
||||
RetryCounter retryCounter;
|
||||
|
||||
@ -421,7 +428,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
FutureTask<FSDataOutputStream> futureTask = new FutureTask<FSDataOutputStream>(callable);
|
||||
executor.execute(futureTask);
|
||||
final int timeoutInSeconds = 30;
|
||||
final int timeoutInSeconds = getConf().getInt(
|
||||
"hbase.hbck.lockfile.maxwaittime", DEFAULT_WAIT_FOR_LOCK_TIMEOUT);
|
||||
FSDataOutputStream stream = null;
|
||||
try {
|
||||
stream = futureTask.get(30, TimeUnit.SECONDS);
|
||||
@ -448,6 +456,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
IOUtils.closeQuietly(hbckOutFd);
|
||||
FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()),
|
||||
HBCK_LOCK_PATH, true);
|
||||
LOG.info("Finishing hbck");
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Failed to delete " + HBCK_LOCK_PATH + ", try="
|
||||
@ -464,7 +473,6 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
}
|
||||
}
|
||||
} while (retryCounter.shouldRetry());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -487,7 +495,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
// Make sure to cleanup the lock
|
||||
hbckLockCleanup.set(true);
|
||||
|
||||
// Add a shutdown hook to this thread, incase user tries to
|
||||
// Add a shutdown hook to this thread, in case user tries to
|
||||
// kill the hbck with a ctrl-c, we want to cleanup the lock so that
|
||||
// it is available for further calls
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@ -497,7 +505,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||
unlockHbck();
|
||||
}
|
||||
});
|
||||
LOG.debug("Launching hbck");
|
||||
|
||||
LOG.info("Launching hbck");
|
||||
|
||||
connection = (ClusterConnection)ConnectionFactory.createConnection(getConf());
|
||||
admin = connection.getAdmin();
|
||||
|
@ -614,7 +614,7 @@ public class TestHBaseFsck {
|
||||
}
|
||||
|
||||
/**
|
||||
* This test makes sure that with 10 retries both parallel instances
|
||||
* This test makes sure that with enough retries both parallel instances
|
||||
* of hbck will be completed successfully.
|
||||
*
|
||||
* @throws Exception
|
||||
@ -624,22 +624,33 @@ public class TestHBaseFsck {
|
||||
final ExecutorService service;
|
||||
final Future<HBaseFsck> hbck1,hbck2;
|
||||
|
||||
// With the ExponentialBackoffPolicyWithLimit (starting with 200 milliseconds sleep time, and
|
||||
// max sleep time of 5 seconds), we can retry around 15 times within 60 seconds before bail out.
|
||||
final int timeoutInSeconds = 60;
|
||||
final int sleepIntervalInMilliseconds = 200;
|
||||
final int maxSleepTimeInMilliseconds = 6000;
|
||||
final int maxRetryAttempts = 15;
|
||||
|
||||
class RunHbck implements Callable<HBaseFsck>{
|
||||
|
||||
@Override
|
||||
public HBaseFsck call() throws Exception {
|
||||
// Increase retry attempts to make sure the non-active hbck doesn't get starved
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setInt("hbase.hbck.lockfile.attempts", 10);
|
||||
c.setInt("hbase.hbck.lockfile.maxwaittime", timeoutInSeconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempt.sleep.interval", sleepIntervalInMilliseconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempt.maxsleeptime", maxSleepTimeInMilliseconds);
|
||||
c.setInt("hbase.hbck.lockfile.attempts", maxRetryAttempts);
|
||||
return doFsck(c, false);
|
||||
}
|
||||
}
|
||||
|
||||
service = Executors.newFixedThreadPool(2);
|
||||
hbck1 = service.submit(new RunHbck());
|
||||
hbck2 = service.submit(new RunHbck());
|
||||
service.shutdown();
|
||||
//wait for 15 seconds, for both hbck calls finish
|
||||
service.awaitTermination(25, TimeUnit.SECONDS);
|
||||
//wait for some time, for both hbck calls finish
|
||||
service.awaitTermination(timeoutInSeconds * 2, TimeUnit.SECONDS);
|
||||
HBaseFsck h1 = hbck1.get();
|
||||
HBaseFsck h2 = hbck2.get();
|
||||
// Both should be successful
|
||||
|
Loading…
x
Reference in New Issue
Block a user