HBASE-12785 Use FutureTask to timeout the attempt to get the lock for hbck

This commit is contained in:
tedyu 2015-01-05 14:09:58 -08:00
parent 24bcebdeeb
commit 37e1bb61f4
1 changed files with 46 additions and 23 deletions

View File

@ -44,8 +44,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -324,6 +328,30 @@ public class HBaseFsck extends Configured implements Closeable {
this.executor = exec;
}
private class FileLockCallable implements Callable<FSDataOutputStream> {
@Override
public FSDataOutputStream call() throws IOException {
try {
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(),
HConstants.DATA_FILE_UMASK_KEY);
Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY);
fs.mkdirs(tmpDir);
HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false);
out.writeBytes(InetAddress.getLocalHost().toString());
out.flush();
return out;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
}
/**
* This method maintains a lock using a file. If the creation fails we return null
*
@ -331,32 +359,27 @@ public class HBaseFsck extends Configured implements Closeable {
* @throws IOException
*/
private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
long start = EnvironmentEdgeManager.currentTime();
FileLockCallable callable = new FileLockCallable();
ExecutorService executor = Executors.newFixedThreadPool(1);
FutureTask<FSDataOutputStream> futureTask = new FutureTask<FSDataOutputStream>(callable);
executor.execute(futureTask);
final int timeoutInSeconds = 30;
FSDataOutputStream stream = null;
try {
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(),
HConstants.DATA_FILE_UMASK_KEY);
Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY);
fs.mkdirs(tmpDir);
HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false);
out.writeBytes(InetAddress.getLocalHost().toString());
out.flush();
return out;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
stream = futureTask.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
LOG.warn("Encountered exception when opening lock file", ee);
} catch (InterruptedException ie) {
LOG.warn("Interrupted when opening lock file", ie);
Thread.currentThread().interrupt();
} catch (TimeoutException exception) {
// took too long to obtain lock
LOG.warn("Took more than " + timeoutInSeconds + " seconds in obtaining lock");
futureTask.cancel(true);
} finally {
long duration = EnvironmentEdgeManager.currentTime() - start;
if (duration > 30000) {
LOG.warn("Took " + duration + " milliseconds to obtain lock");
// took too long to obtain lock
return null;
}
executor.shutdownNow();
}
return stream;
}
private void unlockHbck() {