HBASE-11405 Multiple invocations of hbck in parallel disables balancer permanently (bharath v)
This commit is contained in:
parent
e25ca03497
commit
5a0b2cfbf5
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -45,18 +46,22 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
|
@ -109,7 +114,10 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
|
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
|
@ -178,6 +186,8 @@ public class HBaseFsck extends Configured {
|
||||||
private static final int DEFAULT_OVERLAPS_TO_SIDELINE = 2;
|
private static final int DEFAULT_OVERLAPS_TO_SIDELINE = 2;
|
||||||
private static final int DEFAULT_MAX_MERGE = 5;
|
private static final int DEFAULT_MAX_MERGE = 5;
|
||||||
private static final String TO_BE_LOADED = "to_be_loaded";
|
private static final String TO_BE_LOADED = "to_be_loaded";
|
||||||
|
private static final String HBCK_LOCK_FILE = "hbase-hbck.lock";
|
||||||
|
|
||||||
|
|
||||||
/**********************
|
/**********************
|
||||||
* Internal resources
|
* Internal resources
|
||||||
|
@ -192,6 +202,12 @@ public class HBaseFsck extends Configured {
|
||||||
private long startMillis = System.currentTimeMillis();
|
private long startMillis = System.currentTimeMillis();
|
||||||
private HFileCorruptionChecker hfcc;
|
private HFileCorruptionChecker hfcc;
|
||||||
private int retcode = 0;
|
private int retcode = 0;
|
||||||
|
private Path HBCK_LOCK_PATH;
|
||||||
|
private FSDataOutputStream hbckOutFd;
|
||||||
|
// This lock is to prevent cleanup of balancer resources twice between
|
||||||
|
// ShutdownHook and the main code. We cleanup only if the connect() is
|
||||||
|
// successful
|
||||||
|
private final AtomicBoolean hbckLockCleanup = new AtomicBoolean(false);
|
||||||
|
|
||||||
/***********
|
/***********
|
||||||
* Options
|
* Options
|
||||||
|
@ -300,11 +316,74 @@ public class HBaseFsck extends Configured {
|
||||||
this.executor = exec;
|
this.executor = exec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method maintains a lock using a file. If the creation fails we return null
|
||||||
|
*
|
||||||
|
* @return FSDataOutputStream object corresponding to the newly opened lock file
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
|
||||||
|
try {
|
||||||
|
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
|
||||||
|
FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(),
|
||||||
|
HConstants.DATA_FILE_UMASK_KEY);
|
||||||
|
Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY);
|
||||||
|
fs.mkdirs(tmpDir);
|
||||||
|
HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE);
|
||||||
|
final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false);
|
||||||
|
out.writeBytes(InetAddress.getLocalHost().toString());
|
||||||
|
out.flush();
|
||||||
|
return out;
|
||||||
|
} catch(RemoteException e) {
|
||||||
|
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unlockHbck() {
|
||||||
|
if(hbckLockCleanup.compareAndSet(true, false)){
|
||||||
|
IOUtils.closeStream(hbckOutFd);
|
||||||
|
try{
|
||||||
|
FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), HBCK_LOCK_PATH, true);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
LOG.warn("Failed to delete " + HBCK_LOCK_PATH);
|
||||||
|
LOG.debug(ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To repair region consistency, one must call connect() in order to repair
|
* To repair region consistency, one must call connect() in order to repair
|
||||||
* online state.
|
* online state.
|
||||||
*/
|
*/
|
||||||
public void connect() throws IOException {
|
public void connect() throws IOException {
|
||||||
|
|
||||||
|
// Check if another instance of balancer is running
|
||||||
|
hbckOutFd = checkAndMarkRunningHbck();
|
||||||
|
if (hbckOutFd == null) {
|
||||||
|
setRetCode(-1);
|
||||||
|
LOG.error("Another instance of hbck is running, exiting this instance.[If you are sure" +
|
||||||
|
" no other instance is running, delete the lock file " +
|
||||||
|
HBCK_LOCK_PATH + " and rerun the tool]");
|
||||||
|
throw new IOException("Duplicate hbck - Abort");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure to cleanup the lock
|
||||||
|
hbckLockCleanup.set(true);
|
||||||
|
|
||||||
|
// Add a shutdown hook to this thread, incase user tries to
|
||||||
|
// kill the hbck with a ctrl-c, we want to cleanup the lock so that
|
||||||
|
// it is available for further calls
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
public void run() {
|
||||||
|
unlockHbck();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
LOG.debug("Launching hbck");
|
||||||
|
|
||||||
connection = HConnectionManager.createConnection(getConf());
|
connection = HConnectionManager.createConnection(getConf());
|
||||||
admin = new HBaseAdmin(connection);
|
admin = new HBaseAdmin(connection);
|
||||||
meta = new HTable(TableName.META_TABLE_NAME, connection);
|
meta = new HTable(TableName.META_TABLE_NAME, connection);
|
||||||
|
@ -499,6 +578,9 @@ public class HBaseFsck extends Configured {
|
||||||
|
|
||||||
checkAndFixTableLocks();
|
checkAndFixTableLocks();
|
||||||
|
|
||||||
|
// Remove the hbck lock
|
||||||
|
unlockHbck();
|
||||||
|
|
||||||
// Print table summary
|
// Print table summary
|
||||||
printTableSummary(tablesInfo);
|
printTableSummary(tablesInfo);
|
||||||
return errors.summarize();
|
return errors.summarize();
|
||||||
|
@ -3842,7 +3924,6 @@ public class HBaseFsck extends Configured {
|
||||||
Path hbasedir = FSUtils.getRootDir(conf);
|
Path hbasedir = FSUtils.getRootDir(conf);
|
||||||
URI defaultFs = hbasedir.getFileSystem(conf).getUri();
|
URI defaultFs = hbasedir.getFileSystem(conf).getUri();
|
||||||
FSUtils.setFsDefault(conf, new Path(defaultFs));
|
FSUtils.setFsDefault(conf, new Path(defaultFs));
|
||||||
|
|
||||||
int ret = ToolRunner.run(new HBaseFsckTool(conf), args);
|
int ret = ToolRunner.run(new HBaseFsckTool(conf), args);
|
||||||
System.exit(ret);
|
System.exit(ret);
|
||||||
}
|
}
|
||||||
|
|
|
@ -526,6 +526,49 @@ public class TestHBaseFsck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test makes sure that parallel instances of Hbck is disabled.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testParallelHbck() throws Exception {
|
||||||
|
final ExecutorService service;
|
||||||
|
final Future<HBaseFsck> hbck1,hbck2;
|
||||||
|
|
||||||
|
class RunHbck implements Callable<HBaseFsck>{
|
||||||
|
boolean fail = true;
|
||||||
|
public HBaseFsck call(){
|
||||||
|
try{
|
||||||
|
return doFsck(conf, false);
|
||||||
|
} catch(Exception e){
|
||||||
|
if (e.getMessage().contains("Duplicate hbck")) {
|
||||||
|
fail = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If we reach here, then an exception was caught
|
||||||
|
if (fail) fail();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service = Executors.newFixedThreadPool(2);
|
||||||
|
hbck1 = service.submit(new RunHbck());
|
||||||
|
hbck2 = service.submit(new RunHbck());
|
||||||
|
service.shutdown();
|
||||||
|
//wait for 15 seconds, for both hbck calls finish
|
||||||
|
service.awaitTermination(15, TimeUnit.SECONDS);
|
||||||
|
HBaseFsck h1 = hbck1.get();
|
||||||
|
HBaseFsck h2 = hbck2.get();
|
||||||
|
// Make sure only one of the calls was successful
|
||||||
|
assert(h1 == null || h2 == null);
|
||||||
|
if (h1 != null) {
|
||||||
|
assert(h1.getRetCode() >= 0);
|
||||||
|
}
|
||||||
|
if (h2 != null) {
|
||||||
|
assert(h2.getRetCode() >= 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This create and fixes a bad table with regions that have a duplicate
|
* This create and fixes a bad table with regions that have a duplicate
|
||||||
* start key
|
* start key
|
||||||
|
|
Loading…
Reference in New Issue